diff --git a/central/sensor/service/common/message_injector.go b/central/sensor/service/common/message_injector.go index 387a1539ab757..3e2cdab17fd61 100644 --- a/central/sensor/service/common/message_injector.go +++ b/central/sensor/service/common/message_injector.go @@ -2,6 +2,7 @@ package common import ( "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/pkg/centralsensor" "github.com/stackrox/rox/pkg/concurrency" ) @@ -9,4 +10,5 @@ import ( type MessageInjector interface { InjectMessage(ctx concurrency.Waitable, msg *central.MsgToSensor) error InjectMessageIntoQueue(msg *central.MsgFromSensor) + HasCapability(capability centralsensor.SensorCapability) bool } diff --git a/central/sensor/service/common/sensor_ack.go b/central/sensor/service/common/sensor_ack.go new file mode 100644 index 0000000000000..ca7404c1d339a --- /dev/null +++ b/central/sensor/service/common/sensor_ack.go @@ -0,0 +1,51 @@ +package common + +import ( + "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/pkg/centralsensor" + "github.com/stackrox/rox/pkg/concurrency" +) + +// SendSensorACK sends a SensorACK only when sensor capability support is explicitly advertised. +func SendSensorACK(ctx concurrency.Waitable, action central.SensorACK_Action, messageType central.SensorACK_MessageType, resourceID, reason string, injector MessageInjector) { + if injector == nil { + return + } + + if !injector.HasCapability(centralsensor.SensorACKSupport) { + return + } + + if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{ + Msg: ¢ral.MsgToSensor_SensorAck{ + SensorAck: ¢ral.SensorACK{ + Action: action, + MessageType: messageType, + ResourceId: resourceID, + Reason: reason, + }, + }, + }); err != nil { + log.Warnf("Failed injecting SensorACK (%v) for %v (resource_id=%s): %v", action, messageType, resourceID, err) + } +} + +// SendLegacyNodeInventoryACK sends the legacy NodeInventoryACK message supported since version 4.1. +func SendLegacyNodeInventoryACK(ctx concurrency.Waitable, clusterID, nodeName string, action central.NodeInventoryACK_Action, messageType central.NodeInventoryACK_MessageType, injector MessageInjector) { + if injector == nil { + return + } + + if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{ + Msg: ¢ral.MsgToSensor_NodeInventoryAck{ + NodeInventoryAck: ¢ral.NodeInventoryACK{ + ClusterId: clusterID, + NodeName: nodeName, + Action: action, + MessageType: messageType, + }, + }, + }); err != nil { + log.Warnf("Failed injecting legacy NodeInventoryACK (%v) for cluster=%s node=%s: %v", messageType, clusterID, nodeName, err) + } +} diff --git a/central/sensor/service/connection/upgradecontroller/register_connection_test.go b/central/sensor/service/connection/upgradecontroller/register_connection_test.go index be02b8f750800..00ab06834625d 100644 --- a/central/sensor/service/connection/upgradecontroller/register_connection_test.go +++ b/central/sensor/service/connection/upgradecontroller/register_connection_test.go @@ -6,6 +6,7 @@ import ( "github.com/pkg/errors" "github.com/stackrox/rox/generated/internalapi/central" "github.com/stackrox/rox/generated/storage" + "github.com/stackrox/rox/pkg/centralsensor" "github.com/stackrox/rox/pkg/concurrency" "github.com/stackrox/rox/pkg/version/testutils" "github.com/stretchr/testify/assert" @@ -86,6 +87,10 @@ func (f fakeConnection) InjectMessageIntoQueue(_ *central.MsgFromSensor) { panic("not implemented") } +func (f fakeConnection) HasCapability(_ centralsensor.SensorCapability) bool { + return false +} + func (f fakeConnection) CheckAutoUpgradeSupport() error { if f.autoUpgradeSupported { return nil diff --git a/central/sensor/service/connection/upgradecontroller/upgrade_controller_test.go b/central/sensor/service/connection/upgradecontroller/upgrade_controller_test.go index f77903e231d7a..040a06daf8d76 100644 --- a/central/sensor/service/connection/upgradecontroller/upgrade_controller_test.go +++ b/central/sensor/service/connection/upgradecontroller/upgrade_controller_test.go @@ -9,6 +9,7 @@ import ( "github.com/stackrox/rox/central/sensor/service/connection/upgradecontroller/stateutils" "github.com/stackrox/rox/generated/internalapi/central" "github.com/stackrox/rox/generated/storage" + "github.com/stackrox/rox/pkg/centralsensor" "github.com/stackrox/rox/pkg/concurrency" "github.com/stackrox/rox/pkg/protoassert" "github.com/stackrox/rox/pkg/sensorupgrader" @@ -95,6 +96,10 @@ func (r *recordingConn) InjectMessage(_ concurrency.Waitable, msg *central.MsgTo func (r *recordingConn) InjectMessageIntoQueue(_ *central.MsgFromSensor) {} +func (*recordingConn) HasCapability(_ centralsensor.SensorCapability) bool { + return false +} + func (r *recordingConn) getSentTriggers() []*central.SensorUpgradeTrigger { r.lock.Lock() defer r.lock.Unlock() diff --git a/central/sensor/service/pipeline/nodeindex/pipeline.go b/central/sensor/service/pipeline/nodeindex/pipeline.go index 77e867b4fd49c..03c1b2b6c686c 100644 --- a/central/sensor/service/pipeline/nodeindex/pipeline.go +++ b/central/sensor/service/pipeline/nodeindex/pipeline.go @@ -130,32 +130,16 @@ func sendComplianceAck(ctx context.Context, node *storage.Node, injector common. if injector == nil { return } - // Always send SensorACK (new path). - if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{ - Msg: ¢ral.MsgToSensor_SensorAck{ - SensorAck: ¢ral.SensorACK{ - Action: central.SensorACK_ACK, - MessageType: central.SensorACK_NODE_INDEX_REPORT, - ResourceId: node.GetName(), - }, - }, - }); err != nil { - log.Warnf("Failed injecting SensorACK for node index report (node=%s): %v", node.GetName(), err) - } - - // Always send legacy NodeInventoryACK for backward compatibility. - if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{ - Msg: ¢ral.MsgToSensor_NodeInventoryAck{ - NodeInventoryAck: ¢ral.NodeInventoryACK{ - ClusterId: node.GetClusterId(), - NodeName: node.GetName(), - Action: central.NodeInventoryACK_ACK, - MessageType: central.NodeInventoryACK_NodeIndexer, - }, - }, - }); err != nil { - log.Warnf("Failed injecting legacy NodeInventoryACK for node index report (node=%s): %v", node.GetName(), err) - } + common.SendSensorACK(ctx, central.SensorACK_ACK, central.SensorACK_NODE_INDEX_REPORT, node.GetName(), "", injector) + + common.SendLegacyNodeInventoryACK( + ctx, + node.GetClusterId(), + node.GetName(), + central.NodeInventoryACK_ACK, + central.NodeInventoryACK_NodeIndexer, + injector, + ) log.Debugf("Sent node-indexing ACKs for node %s in cluster %s", node.GetName(), node.GetClusterId()) } diff --git a/central/sensor/service/pipeline/nodeindex/pipeline_test.go b/central/sensor/service/pipeline/nodeindex/pipeline_test.go index b62b4342a49e8..5ea4a302e2ec8 100644 --- a/central/sensor/service/pipeline/nodeindex/pipeline_test.go +++ b/central/sensor/service/pipeline/nodeindex/pipeline_test.go @@ -9,6 +9,7 @@ import ( "github.com/stackrox/rox/generated/internalapi/central" v4 "github.com/stackrox/rox/generated/internalapi/scanner/v4" "github.com/stackrox/rox/generated/storage" + "github.com/stackrox/rox/pkg/centralsensor" "github.com/stackrox/rox/pkg/concurrency" "github.com/stackrox/rox/pkg/features" nodesEnricherMocks "github.com/stackrox/rox/pkg/nodes/enricher/mocks" @@ -103,7 +104,11 @@ func TestPipelineSendsSensorAndLegacyACKs(t *testing.T) { riskManager.EXPECT().CalculateRiskAndUpsertNode(gomock.Any()).Times(1).Return(nil), ) - injector := &recordingInjector{} + injector := &recordingInjector{ + capabilities: map[centralsensor.SensorCapability]bool{ + centralsensor.SensorACKSupport: true, + }, + } p := &pipelineImpl{ clusterStore: clusterStore, nodeDatastore: nodeDatastore, @@ -132,6 +137,53 @@ func TestPipelineSendsSensorAndLegacyACKs(t *testing.T) { }, injector.getSentACKs()) } +func TestPipelineSkipsSensorACKWhenCapabilityMissing(t *testing.T) { + t.Setenv(features.NodeIndexEnabled.EnvVar(), "true") + t.Setenv(features.ScannerV4.EnvVar(), "true") + + ctrl := gomock.NewController(t) + clusterStore := clusterDatastoreMocks.NewMockDataStore(ctrl) + nodeDatastore := nodeDatastoreMocks.NewMockDataStore(ctrl) + riskManager := riskManagerMocks.NewMockManager(ctrl) + enricher := nodesEnricherMocks.NewMockNodeEnricher(ctrl) + + node := storage.Node{ + Id: "1", + Name: "node-name", + ClusterId: "cluster-id", + } + msg := createMsg(mockIndexReport) + + gomock.InOrder( + nodeDatastore.EXPECT().GetNode(gomock.Any(), gomock.Eq(node.GetId())).Times(1).Return(&node, true, nil), + enricher.EXPECT().EnrichNodeWithVulnerabilities(gomock.Any(), nil, gomock.Any()).Times(1).Return(nil), + riskManager.EXPECT().CalculateRiskAndUpsertNode(gomock.Any()).Times(1).Return(nil), + ) + + injector := &recordingInjector{ + capabilities: map[centralsensor.SensorCapability]bool{}, + } + p := &pipelineImpl{ + clusterStore: clusterStore, + nodeDatastore: nodeDatastore, + riskManager: riskManager, + enricher: enricher, + } + + err := p.Run(t.Context(), node.GetClusterId(), msg, injector) + assert.NoError(t, err) + assert.Empty(t, injector.getSentSensorACKs(), "sensor ACK should be skipped without SensorACKSupport") + + protoassert.SlicesEqual(t, []*central.NodeInventoryACK{ + { + ClusterId: node.GetClusterId(), + NodeName: node.GetName(), + Action: central.NodeInventoryACK_ACK, + MessageType: central.NodeInventoryACK_NodeIndexer, + }, + }, injector.getSentACKs()) +} + func createMsg(ir *v4.IndexReport) *central.MsgFromSensor { return ¢ral.MsgFromSensor{ Msg: ¢ral.MsgFromSensor_Event{ @@ -146,9 +198,14 @@ func createMsg(ir *v4.IndexReport) *central.MsgFromSensor { } type recordingInjector struct { - lock sync.Mutex - legacy []*central.NodeInventoryACK - sensor []*central.SensorACK + lock sync.Mutex + legacy []*central.NodeInventoryACK + sensor []*central.SensorACK + capabilities map[centralsensor.SensorCapability]bool +} + +func (r *recordingInjector) HasCapability(cap centralsensor.SensorCapability) bool { + return r.capabilities[cap] } func (r *recordingInjector) InjectMessage(_ concurrency.Waitable, msg *central.MsgToSensor) error { diff --git a/central/sensor/service/pipeline/nodeinventory/pipeline.go b/central/sensor/service/pipeline/nodeinventory/pipeline.go index 965c1f6a1802f..bdbf01ad22cdf 100644 --- a/central/sensor/service/pipeline/nodeinventory/pipeline.go +++ b/central/sensor/service/pipeline/nodeinventory/pipeline.go @@ -144,32 +144,16 @@ func replyCompliance(ctx context.Context, clusterID, nodeName string, t central. return } - // Always send SensorACK (new path). - if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{ - Msg: ¢ral.MsgToSensor_SensorAck{ - SensorAck: ¢ral.SensorACK{ - Action: convertLegacyActionToSensor(t), - MessageType: central.SensorACK_NODE_INVENTORY, - ResourceId: nodeName, - }, - }, - }); err != nil { - log.Warnf("Failed injecting SensorACK for node inventory (clusterID=%s, nodeName=%s): %v", clusterID, nodeName, err) - } - - // Always send legacy NodeInventoryACK for backward compatibility. - if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{ - Msg: ¢ral.MsgToSensor_NodeInventoryAck{ - NodeInventoryAck: ¢ral.NodeInventoryACK{ - ClusterId: clusterID, - NodeName: nodeName, - Action: t, - MessageType: central.NodeInventoryACK_NodeInventory, - }, - }, - }); err != nil { - log.Warnf("Failed injecting legacy NodeInventoryACK for node inventory (clusterID=%s, nodeName=%s): %v", clusterID, nodeName, err) - } + common.SendSensorACK(ctx, convertLegacyActionToSensor(t), central.SensorACK_NODE_INVENTORY, nodeName, "", injector) + + common.SendLegacyNodeInventoryACK( + ctx, + clusterID, + nodeName, + t, + central.NodeInventoryACK_NodeInventory, + injector, + ) log.Debugf("Sent node-inventory ACKs for node %s in cluster %s", nodeName, clusterID) } diff --git a/central/sensor/service/pipeline/nodeinventory/pipeline_test.go b/central/sensor/service/pipeline/nodeinventory/pipeline_test.go index 3763043c8e7a6..54cbf4fee1dce 100644 --- a/central/sensor/service/pipeline/nodeinventory/pipeline_test.go +++ b/central/sensor/service/pipeline/nodeinventory/pipeline_test.go @@ -11,6 +11,7 @@ import ( "github.com/stackrox/rox/central/sensor/service/common" "github.com/stackrox/rox/generated/internalapi/central" "github.com/stackrox/rox/generated/storage" + "github.com/stackrox/rox/pkg/centralsensor" "github.com/stackrox/rox/pkg/concurrency" nodesEnricherMocks "github.com/stackrox/rox/pkg/nodes/enricher/mocks" "github.com/stackrox/rox/pkg/protoassert" @@ -200,7 +201,11 @@ func Test_pipelineImpl_Run_SendsSensorAndLegacyACKs(t *testing.T) { }, }, } - injector := &recordingInjector{} + injector := &recordingInjector{ + capabilities: map[centralsensor.SensorCapability]bool{ + centralsensor.SensorACKSupport: true, + }, + } gomock.InOrder( nodeDatastore.EXPECT().GetNode(gomock.Any(), gomock.Eq(node.GetId())).Times(1).Return(&node, true, nil), @@ -236,12 +241,74 @@ func Test_pipelineImpl_Run_SendsSensorAndLegacyACKs(t *testing.T) { }, injector.getSentSensorACKs(), "sensor ACKs") } +func Test_pipelineImpl_Run_SkipsSensorACKWhenCapabilityMissing(t *testing.T) { + ctrl := gomock.NewController(t) + clusterStore := clusterDatastoreMocks.NewMockDataStore(ctrl) + nodeDatastore := nodeDatastoreMocks.NewMockDataStore(ctrl) + riskManager := riskManagerMocks.NewMockManager(ctrl) + enricher := nodesEnricherMocks.NewMockNodeEnricher(ctrl) + + clusterID := "cluster-1" + nodeName := "node-name" + node := storage.Node{ + Id: "node-id", + ClusterId: clusterID, + } + msg := ¢ral.MsgFromSensor{ + Msg: ¢ral.MsgFromSensor_Event{ + Event: ¢ral.SensorEvent{ + Action: central.ResourceAction_CREATE_RESOURCE, + Resource: ¢ral.SensorEvent_NodeInventory{ + NodeInventory: &storage.NodeInventory{ + NodeId: node.GetId(), + NodeName: nodeName, + }, + }, + }, + }, + } + injector := &recordingInjector{ + capabilities: map[centralsensor.SensorCapability]bool{}, + } + + gomock.InOrder( + nodeDatastore.EXPECT().GetNode(gomock.Any(), gomock.Eq(node.GetId())).Times(1).Return(&node, true, nil), + enricher.EXPECT().EnrichNodeWithVulnerabilities(gomock.Any(), gomock.Any(), nil).Times(1).Return(nil), + riskManager.EXPECT().CalculateRiskAndUpsertNode(gomock.Any()).Times(1).Return(nil), + ) + + p := &pipelineImpl{ + clusterStore: clusterStore, + nodeDatastore: nodeDatastore, + enricher: enricher, + riskManager: riskManager, + } + + err := p.Run(context.Background(), clusterID, msg, injector) + assert.NoError(t, err) + assert.Empty(t, injector.getSentSensorACKs(), "sensor ACK should be skipped without SensorACKSupport") + + protoassert.SlicesEqual(t, []*central.NodeInventoryACK{ + { + ClusterId: clusterID, + NodeName: nodeName, + Action: central.NodeInventoryACK_ACK, + MessageType: central.NodeInventoryACK_NodeInventory, + }, + }, injector.getSentACKs(), "legacy ACKs") +} + var _ common.MessageInjector = (*recordingInjector)(nil) type recordingInjector struct { - lock sync.Mutex - messages []*central.NodeInventoryACK - sensor []*central.SensorACK + lock sync.Mutex + messages []*central.NodeInventoryACK + sensor []*central.SensorACK + capabilities map[centralsensor.SensorCapability]bool +} + +func (r *recordingInjector) HasCapability(cap centralsensor.SensorCapability) bool { + return r.capabilities[cap] } func (r *recordingInjector) InjectMessage(_ concurrency.Waitable, msg *central.MsgToSensor) error {