Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions central/sensor/service/common/sensor_ack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package common

import (
"github.com/stackrox/rox/generated/internalapi/central"
"github.com/stackrox/rox/pkg/centralsensor"
"github.com/stackrox/rox/pkg/concurrency"
)

type capabilityChecker interface {
HasCapability(centralsensor.SensorCapability) bool
}

// SendSensorACK sends a SensorACK only when sensor capability support is explicitly advertised.
func SendSensorACK(ctx concurrency.Waitable, action central.SensorACK_Action, messageType central.SensorACK_MessageType, resourceID, reason string, injector MessageInjector) {
if injector == nil {
return
}

checker, ok := injector.(capabilityChecker)
if !ok || !checker.HasCapability(centralsensor.SensorACKSupport) {
return
}

if err := injector.InjectMessage(ctx, &central.MsgToSensor{
Msg: &central.MsgToSensor_SensorAck{
SensorAck: &central.SensorACK{
Action: action,
MessageType: messageType,
ResourceId: resourceID,
Reason: reason,
},
},
}); err != nil {
log.Warnf("Failed injecting SensorACK (%v) for %v (resource_id=%s): %v", action, messageType, resourceID, err)
}
}

// SendLegacyNodeInventoryACK sends the legacy NodeInventoryACK message supported since version 4.1.
func SendLegacyNodeInventoryACK(ctx concurrency.Waitable, clusterID, nodeName string, action central.NodeInventoryACK_Action, messageType central.NodeInventoryACK_MessageType, injector MessageInjector) {
if injector == nil {
return
}

if err := injector.InjectMessage(ctx, &central.MsgToSensor{
Msg: &central.MsgToSensor_NodeInventoryAck{
NodeInventoryAck: &central.NodeInventoryACK{
ClusterId: clusterID,
NodeName: nodeName,
Action: action,
MessageType: messageType,
},
},
}); err != nil {
log.Warnf("Failed injecting legacy NodeInventoryACK (%v) for cluster=%s node=%s: %v", messageType, clusterID, nodeName, err)
}
}
36 changes: 10 additions & 26 deletions central/sensor/service/pipeline/nodeindex/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,32 +130,16 @@ func sendComplianceAck(ctx context.Context, node *storage.Node, injector common.
if injector == nil {
return
}
// Always send SensorACK (new path).
if err := injector.InjectMessage(ctx, &central.MsgToSensor{
Msg: &central.MsgToSensor_SensorAck{
SensorAck: &central.SensorACK{
Action: central.SensorACK_ACK,
MessageType: central.SensorACK_NODE_INDEX_REPORT,
ResourceId: node.GetName(),
},
},
}); err != nil {
log.Warnf("Failed injecting SensorACK for node index report (node=%s): %v", node.GetName(), err)
}

// Always send legacy NodeInventoryACK for backward compatibility.
if err := injector.InjectMessage(ctx, &central.MsgToSensor{
Msg: &central.MsgToSensor_NodeInventoryAck{
NodeInventoryAck: &central.NodeInventoryACK{
ClusterId: node.GetClusterId(),
NodeName: node.GetName(),
Action: central.NodeInventoryACK_ACK,
MessageType: central.NodeInventoryACK_NodeIndexer,
},
},
}); err != nil {
log.Warnf("Failed injecting legacy NodeInventoryACK for node index report (node=%s): %v", node.GetName(), err)
}
common.SendSensorACK(ctx, central.SensorACK_ACK, central.SensorACK_NODE_INDEX_REPORT, node.GetName(), "", injector)

common.SendLegacyNodeInventoryACK(
ctx,
node.GetClusterId(),
node.GetName(),
central.NodeInventoryACK_ACK,
central.NodeInventoryACK_NodeIndexer,
injector,
)

log.Debugf("Sent node-indexing ACKs for node %s in cluster %s", node.GetName(), node.GetClusterId())
}
63 changes: 62 additions & 1 deletion central/sensor/service/pipeline/nodeindex/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stackrox/rox/generated/internalapi/central"
v4 "github.com/stackrox/rox/generated/internalapi/scanner/v4"
"github.com/stackrox/rox/generated/storage"
"github.com/stackrox/rox/pkg/centralsensor"
"github.com/stackrox/rox/pkg/concurrency"
"github.com/stackrox/rox/pkg/features"
nodesEnricherMocks "github.com/stackrox/rox/pkg/nodes/enricher/mocks"
Expand Down Expand Up @@ -103,7 +104,11 @@ func TestPipelineSendsSensorAndLegacyACKs(t *testing.T) {
riskManager.EXPECT().CalculateRiskAndUpsertNode(gomock.Any()).Times(1).Return(nil),
)

injector := &recordingInjector{}
injector := &recordingInjectorWithCapabilities{
capabilities: map[centralsensor.SensorCapability]bool{
centralsensor.SensorACKSupport: true,
},
}
p := &pipelineImpl{
clusterStore: clusterStore,
nodeDatastore: nodeDatastore,
Expand Down Expand Up @@ -132,6 +137,53 @@ func TestPipelineSendsSensorAndLegacyACKs(t *testing.T) {
}, injector.getSentACKs())
}

func TestPipelineSkipsSensorACKWhenCapabilityMissing(t *testing.T) {
t.Setenv(features.NodeIndexEnabled.EnvVar(), "true")
t.Setenv(features.ScannerV4.EnvVar(), "true")

ctrl := gomock.NewController(t)
clusterStore := clusterDatastoreMocks.NewMockDataStore(ctrl)
nodeDatastore := nodeDatastoreMocks.NewMockDataStore(ctrl)
riskManager := riskManagerMocks.NewMockManager(ctrl)
enricher := nodesEnricherMocks.NewMockNodeEnricher(ctrl)

node := storage.Node{
Id: "1",
Name: "node-name",
ClusterId: "cluster-id",
}
msg := createMsg(mockIndexReport)

gomock.InOrder(
nodeDatastore.EXPECT().GetNode(gomock.Any(), gomock.Eq(node.GetId())).Times(1).Return(&node, true, nil),
enricher.EXPECT().EnrichNodeWithVulnerabilities(gomock.Any(), nil, gomock.Any()).Times(1).Return(nil),
riskManager.EXPECT().CalculateRiskAndUpsertNode(gomock.Any()).Times(1).Return(nil),
)

injector := &recordingInjectorWithCapabilities{
capabilities: map[centralsensor.SensorCapability]bool{},
}
p := &pipelineImpl{
clusterStore: clusterStore,
nodeDatastore: nodeDatastore,
riskManager: riskManager,
enricher: enricher,
}

err := p.Run(t.Context(), node.GetClusterId(), msg, injector)
assert.NoError(t, err)
assert.Empty(t, injector.getSentSensorACKs(), "sensor ACK should be skipped without SensorACKSupport")

protoassert.SlicesEqual(t, []*central.NodeInventoryACK{
{
ClusterId: node.GetClusterId(),
NodeName: node.GetName(),
Action: central.NodeInventoryACK_ACK,
MessageType: central.NodeInventoryACK_NodeIndexer,
},
}, injector.getSentACKs())
}

func createMsg(ir *v4.IndexReport) *central.MsgFromSensor {
return &central.MsgFromSensor{
Msg: &central.MsgFromSensor_Event{
Expand All @@ -151,6 +203,15 @@ type recordingInjector struct {
sensor []*central.SensorACK
}

type recordingInjectorWithCapabilities struct {
recordingInjector
capabilities map[centralsensor.SensorCapability]bool
}

func (r *recordingInjectorWithCapabilities) HasCapability(cap centralsensor.SensorCapability) bool {
return r.capabilities[cap]
}

func (r *recordingInjector) InjectMessage(_ concurrency.Waitable, msg *central.MsgToSensor) error {
r.lock.Lock()
defer r.lock.Unlock()
Expand Down
36 changes: 10 additions & 26 deletions central/sensor/service/pipeline/nodeinventory/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,32 +144,16 @@ func replyCompliance(ctx context.Context, clusterID, nodeName string, t central.
return
}

// Always send SensorACK (new path).
if err := injector.InjectMessage(ctx, &central.MsgToSensor{
Msg: &central.MsgToSensor_SensorAck{
SensorAck: &central.SensorACK{
Action: convertLegacyActionToSensor(t),
MessageType: central.SensorACK_NODE_INVENTORY,
ResourceId: nodeName,
},
},
}); err != nil {
log.Warnf("Failed injecting SensorACK for node inventory (clusterID=%s, nodeName=%s): %v", clusterID, nodeName, err)
}

// Always send legacy NodeInventoryACK for backward compatibility.
if err := injector.InjectMessage(ctx, &central.MsgToSensor{
Msg: &central.MsgToSensor_NodeInventoryAck{
NodeInventoryAck: &central.NodeInventoryACK{
ClusterId: clusterID,
NodeName: nodeName,
Action: t,
MessageType: central.NodeInventoryACK_NodeInventory,
},
},
}); err != nil {
log.Warnf("Failed injecting legacy NodeInventoryACK for node inventory (clusterID=%s, nodeName=%s): %v", clusterID, nodeName, err)
}
common.SendSensorACK(ctx, convertLegacyActionToSensor(t), central.SensorACK_NODE_INVENTORY, nodeName, "", injector)

common.SendLegacyNodeInventoryACK(
ctx,
clusterID,
nodeName,
t,
central.NodeInventoryACK_NodeInventory,
injector,
)

log.Debugf("Sent node-inventory ACKs for node %s in cluster %s", nodeName, clusterID)
}
Expand Down
73 changes: 72 additions & 1 deletion central/sensor/service/pipeline/nodeinventory/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stackrox/rox/central/sensor/service/common"
"github.com/stackrox/rox/generated/internalapi/central"
"github.com/stackrox/rox/generated/storage"
"github.com/stackrox/rox/pkg/centralsensor"
"github.com/stackrox/rox/pkg/concurrency"
nodesEnricherMocks "github.com/stackrox/rox/pkg/nodes/enricher/mocks"
"github.com/stackrox/rox/pkg/protoassert"
Expand Down Expand Up @@ -200,7 +201,11 @@ func Test_pipelineImpl_Run_SendsSensorAndLegacyACKs(t *testing.T) {
},
},
}
injector := &recordingInjector{}
injector := &recordingInjectorWithCapabilities{
capabilities: map[centralsensor.SensorCapability]bool{
centralsensor.SensorACKSupport: true,
},
}

gomock.InOrder(
nodeDatastore.EXPECT().GetNode(gomock.Any(), gomock.Eq(node.GetId())).Times(1).Return(&node, true, nil),
Expand Down Expand Up @@ -236,6 +241,63 @@ func Test_pipelineImpl_Run_SendsSensorAndLegacyACKs(t *testing.T) {
}, injector.getSentSensorACKs(), "sensor ACKs")
}

func Test_pipelineImpl_Run_SkipsSensorACKWhenCapabilityMissing(t *testing.T) {
ctrl := gomock.NewController(t)
clusterStore := clusterDatastoreMocks.NewMockDataStore(ctrl)
nodeDatastore := nodeDatastoreMocks.NewMockDataStore(ctrl)
riskManager := riskManagerMocks.NewMockManager(ctrl)
enricher := nodesEnricherMocks.NewMockNodeEnricher(ctrl)

clusterID := "cluster-1"
nodeName := "node-name"
node := storage.Node{
Id: "node-id",
ClusterId: clusterID,
}
msg := &central.MsgFromSensor{
Msg: &central.MsgFromSensor_Event{
Event: &central.SensorEvent{
Action: central.ResourceAction_CREATE_RESOURCE,
Resource: &central.SensorEvent_NodeInventory{
NodeInventory: &storage.NodeInventory{
NodeId: node.GetId(),
NodeName: nodeName,
},
},
},
},
}
injector := &recordingInjectorWithCapabilities{
capabilities: map[centralsensor.SensorCapability]bool{},
}

gomock.InOrder(
nodeDatastore.EXPECT().GetNode(gomock.Any(), gomock.Eq(node.GetId())).Times(1).Return(&node, true, nil),
enricher.EXPECT().EnrichNodeWithVulnerabilities(gomock.Any(), gomock.Any(), nil).Times(1).Return(nil),
riskManager.EXPECT().CalculateRiskAndUpsertNode(gomock.Any()).Times(1).Return(nil),
)

p := &pipelineImpl{
clusterStore: clusterStore,
nodeDatastore: nodeDatastore,
enricher: enricher,
riskManager: riskManager,
}

err := p.Run(context.Background(), clusterID, msg, injector)
assert.NoError(t, err)
assert.Empty(t, injector.getSentSensorACKs(), "sensor ACK should be skipped without SensorACKSupport")

protoassert.SlicesEqual(t, []*central.NodeInventoryACK{
{
ClusterId: clusterID,
NodeName: nodeName,
Action: central.NodeInventoryACK_ACK,
MessageType: central.NodeInventoryACK_NodeInventory,
},
}, injector.getSentACKs(), "legacy ACKs")
}

var _ common.MessageInjector = (*recordingInjector)(nil)

type recordingInjector struct {
Expand All @@ -244,6 +306,15 @@ type recordingInjector struct {
sensor []*central.SensorACK
}

type recordingInjectorWithCapabilities struct {
recordingInjector
capabilities map[centralsensor.SensorCapability]bool
}

func (r *recordingInjectorWithCapabilities) HasCapability(cap centralsensor.SensorCapability) bool {
return r.capabilities[cap]
}

func (r *recordingInjector) InjectMessage(_ concurrency.Waitable, msg *central.MsgToSensor) error {
r.lock.Lock()
defer r.lock.Unlock()
Expand Down
Loading