From 30e2de3a42acd92230762f01109ba5adec0175db Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Fri, 13 Mar 2026 16:22:38 +0100 Subject: [PATCH 1/2] Add sensor cap checks for SensorAck for node scanning --- central/sensor/service/common/sensor_ack.go | 56 ++++++++++++++ .../service/pipeline/nodeindex/pipeline.go | 36 +++------ .../pipeline/nodeindex/pipeline_test.go | 63 +++++++++++++++- .../pipeline/nodeinventory/pipeline.go | 36 +++------ .../pipeline/nodeinventory/pipeline_test.go | 73 ++++++++++++++++++- 5 files changed, 210 insertions(+), 54 deletions(-) create mode 100644 central/sensor/service/common/sensor_ack.go diff --git a/central/sensor/service/common/sensor_ack.go b/central/sensor/service/common/sensor_ack.go new file mode 100644 index 0000000000000..6730ee7d99118 --- /dev/null +++ b/central/sensor/service/common/sensor_ack.go @@ -0,0 +1,56 @@ +package common + +import ( + "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/pkg/centralsensor" + "github.com/stackrox/rox/pkg/concurrency" +) + +type capabilityChecker interface { + HasCapability(centralsensor.SensorCapability) bool +} + +// SendSensorACK sends a SensorACK only when sensor capability support is explicitly advertised. +func SendSensorACK(ctx concurrency.Waitable, action central.SensorACK_Action, messageType central.SensorACK_MessageType, resourceID, reason string, injector MessageInjector) { + if injector == nil { + return + } + + checker, ok := injector.(capabilityChecker) + if !ok || !checker.HasCapability(centralsensor.SensorACKSupport) { + return + } + + if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{ + Msg: ¢ral.MsgToSensor_SensorAck{ + SensorAck: ¢ral.SensorACK{ + Action: action, + MessageType: messageType, + ResourceId: resourceID, + Reason: reason, + }, + }, + }); err != nil { + log.Warnf("Failed injecting SensorACK (%v) for %v (resource_id=%s): %v", action, messageType, resourceID, err) + } +} + +// SendLegacyNodeInventoryACK sends the legacy NodeInventoryACK message supported since version 4.1. +func SendLegacyNodeInventoryACK(ctx concurrency.Waitable, clusterID, nodeName string, action central.NodeInventoryACK_Action, messageType central.NodeInventoryACK_MessageType, injector MessageInjector) { + if injector == nil { + return + } + + if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{ + Msg: ¢ral.MsgToSensor_NodeInventoryAck{ + NodeInventoryAck: ¢ral.NodeInventoryACK{ + ClusterId: clusterID, + NodeName: nodeName, + Action: action, + MessageType: messageType, + }, + }, + }); err != nil { + log.Warnf("Failed injecting legacy NodeInventoryACK (%v) for cluster=%s node=%s: %v", messageType, clusterID, nodeName, err) + } +} diff --git a/central/sensor/service/pipeline/nodeindex/pipeline.go b/central/sensor/service/pipeline/nodeindex/pipeline.go index 77e867b4fd49c..03c1b2b6c686c 100644 --- a/central/sensor/service/pipeline/nodeindex/pipeline.go +++ b/central/sensor/service/pipeline/nodeindex/pipeline.go @@ -130,32 +130,16 @@ func sendComplianceAck(ctx context.Context, node *storage.Node, injector common. if injector == nil { return } - // Always send SensorACK (new path). - if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{ - Msg: ¢ral.MsgToSensor_SensorAck{ - SensorAck: ¢ral.SensorACK{ - Action: central.SensorACK_ACK, - MessageType: central.SensorACK_NODE_INDEX_REPORT, - ResourceId: node.GetName(), - }, - }, - }); err != nil { - log.Warnf("Failed injecting SensorACK for node index report (node=%s): %v", node.GetName(), err) - } - - // Always send legacy NodeInventoryACK for backward compatibility. - if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{ - Msg: ¢ral.MsgToSensor_NodeInventoryAck{ - NodeInventoryAck: ¢ral.NodeInventoryACK{ - ClusterId: node.GetClusterId(), - NodeName: node.GetName(), - Action: central.NodeInventoryACK_ACK, - MessageType: central.NodeInventoryACK_NodeIndexer, - }, - }, - }); err != nil { - log.Warnf("Failed injecting legacy NodeInventoryACK for node index report (node=%s): %v", node.GetName(), err) - } + common.SendSensorACK(ctx, central.SensorACK_ACK, central.SensorACK_NODE_INDEX_REPORT, node.GetName(), "", injector) + + common.SendLegacyNodeInventoryACK( + ctx, + node.GetClusterId(), + node.GetName(), + central.NodeInventoryACK_ACK, + central.NodeInventoryACK_NodeIndexer, + injector, + ) log.Debugf("Sent node-indexing ACKs for node %s in cluster %s", node.GetName(), node.GetClusterId()) } diff --git a/central/sensor/service/pipeline/nodeindex/pipeline_test.go b/central/sensor/service/pipeline/nodeindex/pipeline_test.go index b62b4342a49e8..f13934f782759 100644 --- a/central/sensor/service/pipeline/nodeindex/pipeline_test.go +++ b/central/sensor/service/pipeline/nodeindex/pipeline_test.go @@ -9,6 +9,7 @@ import ( "github.com/stackrox/rox/generated/internalapi/central" v4 "github.com/stackrox/rox/generated/internalapi/scanner/v4" "github.com/stackrox/rox/generated/storage" + "github.com/stackrox/rox/pkg/centralsensor" "github.com/stackrox/rox/pkg/concurrency" "github.com/stackrox/rox/pkg/features" nodesEnricherMocks "github.com/stackrox/rox/pkg/nodes/enricher/mocks" @@ -103,7 +104,11 @@ func TestPipelineSendsSensorAndLegacyACKs(t *testing.T) { riskManager.EXPECT().CalculateRiskAndUpsertNode(gomock.Any()).Times(1).Return(nil), ) - injector := &recordingInjector{} + injector := &recordingInjectorWithCapabilities{ + capabilities: map[centralsensor.SensorCapability]bool{ + centralsensor.SensorACKSupport: true, + }, + } p := &pipelineImpl{ clusterStore: clusterStore, nodeDatastore: nodeDatastore, @@ -132,6 +137,53 @@ func TestPipelineSendsSensorAndLegacyACKs(t *testing.T) { }, injector.getSentACKs()) } +func TestPipelineSkipsSensorACKWhenCapabilityMissing(t *testing.T) { + t.Setenv(features.NodeIndexEnabled.EnvVar(), "true") + t.Setenv(features.ScannerV4.EnvVar(), "true") + + ctrl := gomock.NewController(t) + clusterStore := clusterDatastoreMocks.NewMockDataStore(ctrl) + nodeDatastore := nodeDatastoreMocks.NewMockDataStore(ctrl) + riskManager := riskManagerMocks.NewMockManager(ctrl) + enricher := nodesEnricherMocks.NewMockNodeEnricher(ctrl) + + node := storage.Node{ + Id: "1", + Name: "node-name", + ClusterId: "cluster-id", + } + msg := createMsg(mockIndexReport) + + gomock.InOrder( + nodeDatastore.EXPECT().GetNode(gomock.Any(), gomock.Eq(node.GetId())).Times(1).Return(&node, true, nil), + enricher.EXPECT().EnrichNodeWithVulnerabilities(gomock.Any(), nil, gomock.Any()).Times(1).Return(nil), + riskManager.EXPECT().CalculateRiskAndUpsertNode(gomock.Any()).Times(1).Return(nil), + ) + + injector := &recordingInjectorWithCapabilities{ + capabilities: map[centralsensor.SensorCapability]bool{}, + } + p := &pipelineImpl{ + clusterStore: clusterStore, + nodeDatastore: nodeDatastore, + riskManager: riskManager, + enricher: enricher, + } + + err := p.Run(t.Context(), node.GetClusterId(), msg, injector) + assert.NoError(t, err) + assert.Empty(t, injector.getSentSensorACKs(), "sensor ACK should be skipped without SensorACKSupport") + + protoassert.SlicesEqual(t, []*central.NodeInventoryACK{ + { + ClusterId: node.GetClusterId(), + NodeName: node.GetName(), + Action: central.NodeInventoryACK_ACK, + MessageType: central.NodeInventoryACK_NodeIndexer, + }, + }, injector.getSentACKs()) +} + func createMsg(ir *v4.IndexReport) *central.MsgFromSensor { return ¢ral.MsgFromSensor{ Msg: ¢ral.MsgFromSensor_Event{ @@ -151,6 +203,11 @@ type recordingInjector struct { sensor []*central.SensorACK } +type recordingInjectorWithCapabilities struct { + recordingInjector + capabilities map[centralsensor.SensorCapability]bool +} + func (r *recordingInjector) InjectMessage(_ concurrency.Waitable, msg *central.MsgToSensor) error { r.lock.Lock() defer r.lock.Unlock() @@ -165,6 +222,10 @@ func (r *recordingInjector) InjectMessage(_ concurrency.Waitable, msg *central.M func (r *recordingInjector) InjectMessageIntoQueue(_ *central.MsgFromSensor) {} +func (r *recordingInjectorWithCapabilities) HasCapability(cap centralsensor.SensorCapability) bool { + return r.capabilities[cap] +} + func (r *recordingInjector) getSentACKs() []*central.NodeInventoryACK { r.lock.Lock() defer r.lock.Unlock() diff --git a/central/sensor/service/pipeline/nodeinventory/pipeline.go b/central/sensor/service/pipeline/nodeinventory/pipeline.go index 965c1f6a1802f..bdbf01ad22cdf 100644 --- a/central/sensor/service/pipeline/nodeinventory/pipeline.go +++ b/central/sensor/service/pipeline/nodeinventory/pipeline.go @@ -144,32 +144,16 @@ func replyCompliance(ctx context.Context, clusterID, nodeName string, t central. return } - // Always send SensorACK (new path). - if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{ - Msg: ¢ral.MsgToSensor_SensorAck{ - SensorAck: ¢ral.SensorACK{ - Action: convertLegacyActionToSensor(t), - MessageType: central.SensorACK_NODE_INVENTORY, - ResourceId: nodeName, - }, - }, - }); err != nil { - log.Warnf("Failed injecting SensorACK for node inventory (clusterID=%s, nodeName=%s): %v", clusterID, nodeName, err) - } - - // Always send legacy NodeInventoryACK for backward compatibility. - if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{ - Msg: ¢ral.MsgToSensor_NodeInventoryAck{ - NodeInventoryAck: ¢ral.NodeInventoryACK{ - ClusterId: clusterID, - NodeName: nodeName, - Action: t, - MessageType: central.NodeInventoryACK_NodeInventory, - }, - }, - }); err != nil { - log.Warnf("Failed injecting legacy NodeInventoryACK for node inventory (clusterID=%s, nodeName=%s): %v", clusterID, nodeName, err) - } + common.SendSensorACK(ctx, convertLegacyActionToSensor(t), central.SensorACK_NODE_INVENTORY, nodeName, "", injector) + + common.SendLegacyNodeInventoryACK( + ctx, + clusterID, + nodeName, + t, + central.NodeInventoryACK_NodeInventory, + injector, + ) log.Debugf("Sent node-inventory ACKs for node %s in cluster %s", nodeName, clusterID) } diff --git a/central/sensor/service/pipeline/nodeinventory/pipeline_test.go b/central/sensor/service/pipeline/nodeinventory/pipeline_test.go index 3763043c8e7a6..eaaa0aa1e3fa2 100644 --- a/central/sensor/service/pipeline/nodeinventory/pipeline_test.go +++ b/central/sensor/service/pipeline/nodeinventory/pipeline_test.go @@ -11,6 +11,7 @@ import ( "github.com/stackrox/rox/central/sensor/service/common" "github.com/stackrox/rox/generated/internalapi/central" "github.com/stackrox/rox/generated/storage" + "github.com/stackrox/rox/pkg/centralsensor" "github.com/stackrox/rox/pkg/concurrency" nodesEnricherMocks "github.com/stackrox/rox/pkg/nodes/enricher/mocks" "github.com/stackrox/rox/pkg/protoassert" @@ -200,7 +201,11 @@ func Test_pipelineImpl_Run_SendsSensorAndLegacyACKs(t *testing.T) { }, }, } - injector := &recordingInjector{} + injector := &recordingInjectorWithCapabilities{ + capabilities: map[centralsensor.SensorCapability]bool{ + centralsensor.SensorACKSupport: true, + }, + } gomock.InOrder( nodeDatastore.EXPECT().GetNode(gomock.Any(), gomock.Eq(node.GetId())).Times(1).Return(&node, true, nil), @@ -236,6 +241,63 @@ func Test_pipelineImpl_Run_SendsSensorAndLegacyACKs(t *testing.T) { }, injector.getSentSensorACKs(), "sensor ACKs") } +func Test_pipelineImpl_Run_SkipsSensorACKWhenCapabilityMissing(t *testing.T) { + ctrl := gomock.NewController(t) + clusterStore := clusterDatastoreMocks.NewMockDataStore(ctrl) + nodeDatastore := nodeDatastoreMocks.NewMockDataStore(ctrl) + riskManager := riskManagerMocks.NewMockManager(ctrl) + enricher := nodesEnricherMocks.NewMockNodeEnricher(ctrl) + + clusterID := "cluster-1" + nodeName := "node-name" + node := storage.Node{ + Id: "node-id", + ClusterId: clusterID, + } + msg := ¢ral.MsgFromSensor{ + Msg: ¢ral.MsgFromSensor_Event{ + Event: ¢ral.SensorEvent{ + Action: central.ResourceAction_CREATE_RESOURCE, + Resource: ¢ral.SensorEvent_NodeInventory{ + NodeInventory: &storage.NodeInventory{ + NodeId: node.GetId(), + NodeName: nodeName, + }, + }, + }, + }, + } + injector := &recordingInjectorWithCapabilities{ + capabilities: map[centralsensor.SensorCapability]bool{}, + } + + gomock.InOrder( + nodeDatastore.EXPECT().GetNode(gomock.Any(), gomock.Eq(node.GetId())).Times(1).Return(&node, true, nil), + enricher.EXPECT().EnrichNodeWithVulnerabilities(gomock.Any(), gomock.Any(), nil).Times(1).Return(nil), + riskManager.EXPECT().CalculateRiskAndUpsertNode(gomock.Any()).Times(1).Return(nil), + ) + + p := &pipelineImpl{ + clusterStore: clusterStore, + nodeDatastore: nodeDatastore, + enricher: enricher, + riskManager: riskManager, + } + + err := p.Run(context.Background(), clusterID, msg, injector) + assert.NoError(t, err) + assert.Empty(t, injector.getSentSensorACKs(), "sensor ACK should be skipped without SensorACKSupport") + + protoassert.SlicesEqual(t, []*central.NodeInventoryACK{ + { + ClusterId: clusterID, + NodeName: nodeName, + Action: central.NodeInventoryACK_ACK, + MessageType: central.NodeInventoryACK_NodeInventory, + }, + }, injector.getSentACKs(), "legacy ACKs") +} + var _ common.MessageInjector = (*recordingInjector)(nil) type recordingInjector struct { @@ -244,6 +306,11 @@ type recordingInjector struct { sensor []*central.SensorACK } +type recordingInjectorWithCapabilities struct { + recordingInjector + capabilities map[centralsensor.SensorCapability]bool +} + func (r *recordingInjector) InjectMessage(_ concurrency.Waitable, msg *central.MsgToSensor) error { r.lock.Lock() defer r.lock.Unlock() @@ -258,6 +325,10 @@ func (r *recordingInjector) InjectMessage(_ concurrency.Waitable, msg *central.M func (r *recordingInjector) InjectMessageIntoQueue(_ *central.MsgFromSensor) {} +func (r *recordingInjectorWithCapabilities) HasCapability(cap centralsensor.SensorCapability) bool { + return r.capabilities[cap] +} + func (r *recordingInjector) getSentACKs() []*central.NodeInventoryACK { r.lock.Lock() defer r.lock.Unlock() From 8b9adc1f1406c3c8ca012c83e12ab130ea2dc3dc Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Fri, 13 Mar 2026 16:35:16 +0100 Subject: [PATCH 2/2] Reorder functions in test mock --- .../sensor/service/pipeline/nodeindex/pipeline_test.go | 8 ++++---- .../service/pipeline/nodeinventory/pipeline_test.go | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/central/sensor/service/pipeline/nodeindex/pipeline_test.go b/central/sensor/service/pipeline/nodeindex/pipeline_test.go index f13934f782759..a7d73ef26e78f 100644 --- a/central/sensor/service/pipeline/nodeindex/pipeline_test.go +++ b/central/sensor/service/pipeline/nodeindex/pipeline_test.go @@ -208,6 +208,10 @@ type recordingInjectorWithCapabilities struct { capabilities map[centralsensor.SensorCapability]bool } +func (r *recordingInjectorWithCapabilities) HasCapability(cap centralsensor.SensorCapability) bool { + return r.capabilities[cap] +} + func (r *recordingInjector) InjectMessage(_ concurrency.Waitable, msg *central.MsgToSensor) error { r.lock.Lock() defer r.lock.Unlock() @@ -222,10 +226,6 @@ func (r *recordingInjector) InjectMessage(_ concurrency.Waitable, msg *central.M func (r *recordingInjector) InjectMessageIntoQueue(_ *central.MsgFromSensor) {} -func (r *recordingInjectorWithCapabilities) HasCapability(cap centralsensor.SensorCapability) bool { - return r.capabilities[cap] -} - func (r *recordingInjector) getSentACKs() []*central.NodeInventoryACK { r.lock.Lock() defer r.lock.Unlock() diff --git a/central/sensor/service/pipeline/nodeinventory/pipeline_test.go b/central/sensor/service/pipeline/nodeinventory/pipeline_test.go index eaaa0aa1e3fa2..6d3a9df9dd639 100644 --- a/central/sensor/service/pipeline/nodeinventory/pipeline_test.go +++ b/central/sensor/service/pipeline/nodeinventory/pipeline_test.go @@ -311,6 +311,10 @@ type recordingInjectorWithCapabilities struct { capabilities map[centralsensor.SensorCapability]bool } +func (r *recordingInjectorWithCapabilities) HasCapability(cap centralsensor.SensorCapability) bool { + return r.capabilities[cap] +} + func (r *recordingInjector) InjectMessage(_ concurrency.Waitable, msg *central.MsgToSensor) error { r.lock.Lock() defer r.lock.Unlock() @@ -325,10 +329,6 @@ func (r *recordingInjector) InjectMessage(_ concurrency.Waitable, msg *central.M func (r *recordingInjector) InjectMessageIntoQueue(_ *central.MsgFromSensor) {} -func (r *recordingInjectorWithCapabilities) HasCapability(cap centralsensor.SensorCapability) bool { - return r.capabilities[cap] -} - func (r *recordingInjector) getSentACKs() []*central.NodeInventoryACK { r.lock.Lock() defer r.lock.Unlock()