Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions central/metrics/central.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,26 @@ var (
Name: "signature_verification_reprocessor_duration_seconds",
Help: "Duration of the signature verification reprocessor loop in seconds",
})

msgToSensorNotSentCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: metrics.PrometheusNamespace,
Subsystem: metrics.CentralSubsystem.String(),
Name: "msg_to_sensor_not_sent_count",
Help: "Total messages not sent to Sensor due to errors or other reasons",
}, []string{"ClusterID", "type", "reason"})
)

// Reasons for a message not being sent.
var (
// NotSentError indicates that an attempt was made to send the message
// but an error was encountered.
NotSentError = "error"
// NotSentSignal indicates that a signal prevented the message from being
// sent, such as a timeout.
NotSentSignal = "signal"
// NotSentSkip indicates that no attempt was made to send the message,
// perhaps due to prior errors.
NotSentSkip = "skip"
)

func startTimeToMS(t time.Time) float64 {
Expand Down Expand Up @@ -468,6 +488,16 @@ func SetReprocessorDuration(start time.Time) {
reprocessorDurationGauge.Set(time.Since(start).Seconds())
}

// IncrementMsgToSensorNotSentCounter increments the count of messages not sent to Sensor due to
// errors or other reasons.
func IncrementMsgToSensorNotSentCounter(clusterID string, msg *central.MsgToSensor, reason string) {
if msg.GetMsg() == nil {
return
}
typ := event.GetEventTypeWithoutPrefix(msg.GetMsg())
msgToSensorNotSentCounter.With(prometheus.Labels{"ClusterID": clusterID, "type": typ, "reason": reason}).Inc()
}

// SetSignatureVerificationReprocessorDuration registers how long a signature verification reprocessing step took.
func SetSignatureVerificationReprocessorDuration(start time.Time) {
signatureVerificationReprocessorDurationGauge.Set(time.Since(start).Seconds())
Expand Down
74 changes: 74 additions & 0 deletions central/metrics/central_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package metrics

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stackrox/rox/generated/internalapi/central"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestIncrementMsgToSensorNotSentCounter(t *testing.T) {
t.Run("no panic on nil msg", func(t *testing.T) {
assert.NotPanics(t, func() {
IncrementMsgToSensorNotSentCounter("", nil, "")
})
})

t.Run("no panic on nil inner msg", func(t *testing.T) {
assert.NotPanics(t, func() {
IncrementMsgToSensorNotSentCounter("", &central.MsgToSensor{
Msg: nil,
}, "")
})
})

t.Run("inc and extract type", func(t *testing.T) {
// Clear any prior values.
prometheus.Unregister(msgToSensorNotSentCounter)
require.NoError(t, prometheus.Register(msgToSensorNotSentCounter))

// Get references to the counters.
updImgErrCounter, err := msgToSensorNotSentCounter.GetMetricWith(
prometheus.Labels{"ClusterID": "a", "type": "UpdatedImage", "reason": NotSentError},
)
require.NoError(t, err)
updImgSkipCounter, err := msgToSensorNotSentCounter.GetMetricWith(
prometheus.Labels{"ClusterID": "a", "type": "UpdatedImage", "reason": NotSentSkip},
)
require.NoError(t, err)
reprocessDeploySignalCounter, err := msgToSensorNotSentCounter.GetMetricWith(
prometheus.Labels{"ClusterID": "b", "type": "ReprocessDeployments", "reason": NotSentSignal},
)
require.NoError(t, err)

// Sanity check.
assert.Equal(t, 0.0, testutil.ToFloat64(updImgErrCounter))
assert.Equal(t, 0.0, testutil.ToFloat64(updImgSkipCounter))
assert.Equal(t, 0.0, testutil.ToFloat64(reprocessDeploySignalCounter))

// Verify the count is incremented, type extracted, and reason recorded.
IncrementMsgToSensorNotSentCounter("a", &central.MsgToSensor{
Msg: &central.MsgToSensor_UpdatedImage{},
}, NotSentError)
assert.Equal(t, 1.0, testutil.ToFloat64(updImgErrCounter))
assert.Equal(t, 0.0, testutil.ToFloat64(updImgSkipCounter))
assert.Equal(t, 0.0, testutil.ToFloat64(reprocessDeploySignalCounter))

IncrementMsgToSensorNotSentCounter("a", &central.MsgToSensor{
Msg: &central.MsgToSensor_UpdatedImage{},
}, NotSentSkip)
assert.Equal(t, 1.0, testutil.ToFloat64(updImgErrCounter))
assert.Equal(t, 1.0, testutil.ToFloat64(updImgSkipCounter))
assert.Equal(t, 0.0, testutil.ToFloat64(reprocessDeploySignalCounter))

IncrementMsgToSensorNotSentCounter("b", &central.MsgToSensor{
Msg: &central.MsgToSensor_ReprocessDeployments{},
}, NotSentSignal)
assert.Equal(t, 1.0, testutil.ToFloat64(updImgErrCounter))
assert.Equal(t, 1.0, testutil.ToFloat64(updImgSkipCounter))
assert.Equal(t, 1.0, testutil.ToFloat64(reprocessDeploySignalCounter))
})
}
1 change: 1 addition & 0 deletions central/metrics/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ func init() {
signatureVerificationReprocessorDurationGauge,
pruningDurationHistogramVec,
storeCacheOperationHistogramVec,
msgToSensorNotSentCounter,
)
}
90 changes: 73 additions & 17 deletions central/reprocessor/reprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
imageEnricher "github.com/stackrox/rox/pkg/images/enricher"
"github.com/stackrox/rox/pkg/images/utils"
"github.com/stackrox/rox/pkg/logging"
"github.com/stackrox/rox/pkg/maputil"
nodeEnricher "github.com/stackrox/rox/pkg/nodes/enricher"
"github.com/stackrox/rox/pkg/sac"
"github.com/stackrox/rox/pkg/sac/resources"
Expand All @@ -37,6 +38,10 @@
"golang.org/x/sync/semaphore"
)

const (
imageReprocessorSemaphoreSize = int64(5)
)

var (
log = logging.LoggerForModule(administrationEvents.EnableAdministrationEvents())

Expand Down Expand Up @@ -141,6 +146,8 @@
signatureVerificationSig: concurrency.NewSignal(),

connManager: connManager,

injectMessageTimeoutDur: env.ReprocessInjectMessageTimeout.DurationSetting(),

Check warning on line 150 in central/reprocessor/reprocessor.go

View check run for this annotation

Codecov / codecov/patch

central/reprocessor/reprocessor.go#L149-L150

Added lines #L149 - L150 were not covered by tests
}
}

Expand Down Expand Up @@ -184,6 +191,8 @@
reprocessingInProgress concurrency.Flag

connManager connection.Manager

injectMessageTimeoutDur time.Duration
}

func (l *loopImpl) ReprocessRiskForDeployments(deploymentIDs ...string) {
Expand Down Expand Up @@ -356,16 +365,6 @@
return image, true
}

func (l *loopImpl) getActiveImageIDs() ([]string, error) {
query := search.NewQueryBuilder().AddStringsHighlighted(search.DeploymentID, search.WildcardString).ProtoQuery()
results, err := l.images.Search(allAccessCtx, query)
if err != nil {
return nil, errors.Wrap(err, "error searching for active image IDs")
}

return search.ResultsToIDs(results), nil
}

Comment thread
dcaravel marked this conversation as resolved.
func (l *loopImpl) reprocessImagesAndResyncDeployments(fetchOpt imageEnricher.FetchOption,
imgReprocessingFunc imageReprocessingFunc, imageQuery *v1.Query) {
if l.stopSig.IsDone() {
Expand All @@ -382,9 +381,10 @@
return
}

sema := semaphore.NewWeighted(5)
sema := semaphore.NewWeighted(imageReprocessorSemaphoreSize)
wg := concurrency.NewWaitGroup(0)
nReprocessed := atomic.NewInt32(0)
skipClusterIDs := maputil.NewSyncMap[string, struct{}]()
for _, result := range results {
wg.Add(1)
if err := sema.Acquire(concurrency.AsContext(&l.stopSig), 1); err != nil {
Expand All @@ -406,19 +406,39 @@
utils.FilterSuppressedCVEsNoClone(image)
utils.StripCVEDescriptionsNoClone(image)

// Send the updated image to relevant clusters.
for clusterID := range clusterIDs {
conn := l.connManager.GetConnection(clusterID)
if conn == nil {
continue
}
err := conn.InjectMessage(concurrency.AsContext(&l.stopSig), &central.MsgToSensor{

msg := &central.MsgToSensor{
Msg: &central.MsgToSensor_UpdatedImage{
UpdatedImage: image,
},
})
}

// If were prior errors, do not attempt to send a message to this cluster.
if skipClusterIDs.Contains(clusterID) {
metrics.IncrementMsgToSensorNotSentCounter(clusterID, msg, metrics.NotSentSkip)
log.Debugw("Not sending updated image to cluster due to prior errors",
logging.ImageID(image.GetId()),
logging.ImageName(image.GetName().GetFullName()),
logging.String("dst_cluster", clusterID),
)
continue
}

err := l.injectMessage(concurrency.AsContext(&l.stopSig), conn, msg)
if err != nil {
log.Errorw("Error sending updated image to sensor "+clusterID,
logging.ImageName(image.GetName().GetFullName()), logging.ImageID(image.GetId()), logging.Err(err))
skipClusterIDs.Store(clusterID, struct{}{})
log.Errorw("Error sending updated image to cluster, skipping cluster until next reprocessing cycle",
logging.ImageName(image.GetName().GetFullName()),
logging.ImageID(image.GetId()), logging.Err(err),
// Not using logging.ClusterID() to avoid "duplicate resource ID field found" panic
logging.String("dst_cluster", clusterID),
)
}
}
}(result.ID, clusterIDSet)
Expand All @@ -434,14 +454,50 @@
// Once the images have been rescanned, then reprocess the deployments.
// This should not take a particularly long period of time.
if !l.stopSig.IsDone() {
l.connManager.BroadcastMessage(&central.MsgToSensor{
msg := &central.MsgToSensor{
Msg: &central.MsgToSensor_ReprocessDeployments{
ReprocessDeployments: &central.ReprocessDeployments{},
},
})
}
ctx := concurrency.AsContext(&l.stopSig)
for _, conn := range l.connManager.GetActiveConnections() {
clusterID := conn.ClusterID()
if skipClusterIDs.Contains(clusterID) {
metrics.IncrementMsgToSensorNotSentCounter(clusterID, msg, metrics.NotSentSkip)
log.Errorw("Not sending reprocess deployments to cluster due to prior errors",
logging.ClusterID(clusterID),
)
continue
}

err := l.injectMessage(ctx, conn, msg)
if err != nil {
log.Errorw("Error sending reprocess deployments message to cluster",
logging.ClusterID(clusterID),
logging.Err(err),
)
}

Check warning on line 479 in central/reprocessor/reprocessor.go

View check run for this annotation

Codecov / codecov/patch

central/reprocessor/reprocessor.go#L475-L479

Added lines #L475 - L479 were not covered by tests
}
}
}

// injectMessage will inject a message onto connection, an error will be returned if the
// injection fails for any reason, including timeout.
func (l *loopImpl) injectMessage(ctx context.Context, conn connection.SensorConnection, msg *central.MsgToSensor) error {
if l.injectMessageTimeoutDur > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, l.injectMessageTimeoutDur)
defer cancel()
}
Comment thread
dcaravel marked this conversation as resolved.

err := conn.InjectMessage(ctx, msg)
if err != nil {
return errors.Wrap(err, "injecting message to sensor")
}

return nil
}

func (l *loopImpl) reprocessNode(id string) bool {
node, exists, err := l.nodes.GetNode(allAccessCtx, id)
if err != nil {
Expand Down
51 changes: 0 additions & 51 deletions central/reprocessor/reprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,72 +7,21 @@ import (
"testing"
"time"

deploymentDatastore "github.com/stackrox/rox/central/deployment/datastore"
imageDatastore "github.com/stackrox/rox/central/image/datastore"
imagePG "github.com/stackrox/rox/central/image/datastore/store/postgres"
imagePostgresV2 "github.com/stackrox/rox/central/image/datastore/store/v2/postgres"
platformmatcher "github.com/stackrox/rox/central/platform/matcher"
"github.com/stackrox/rox/central/ranking"
"github.com/stackrox/rox/generated/storage"
"github.com/stackrox/rox/pkg/concurrency"
"github.com/stackrox/rox/pkg/features"
"github.com/stackrox/rox/pkg/fixtures"
"github.com/stackrox/rox/pkg/postgres"
"github.com/stackrox/rox/pkg/postgres/pgtest"
"github.com/stackrox/rox/pkg/process/filter"
"github.com/stackrox/rox/pkg/protocompat"
"github.com/stackrox/rox/pkg/sac"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

func TestGetActiveImageIDs(t *testing.T) {
t.Parallel()

testCtx := sac.WithAllAccess(context.Background())

var (
pool postgres.DB
imageDS imageDatastore.DataStore
deploymentsDS deploymentDatastore.DataStore
err error
)

testingDB := pgtest.ForT(t)
pool = testingDB.DB
defer pool.Close()

mockCtrl := gomock.NewController(t)
if features.FlattenCVEData.Enabled() {
imageDS = imageDatastore.NewWithPostgres(imagePostgresV2.New(pool, false, concurrency.NewKeyFence()), nil, ranking.ImageRanker(), ranking.ComponentRanker())
} else {
imageDS = imageDatastore.NewWithPostgres(imagePG.New(pool, false, concurrency.NewKeyFence()), nil, ranking.ImageRanker(), ranking.ComponentRanker())
}
deploymentsDS, err = deploymentDatastore.New(pool, nil, nil, nil, nil, nil, filter.NewFilter(5, 5, []int{5}), ranking.NewRanker(), ranking.NewRanker(), ranking.NewRanker(), platformmatcher.GetTestPlatformMatcherWithDefaultPlatformComponentConfig(mockCtrl))
require.NoError(t, err)

loop := NewLoop(nil, nil, nil, deploymentsDS, imageDS, nil, nil, nil, nil).(*loopImpl)

ids, err := loop.getActiveImageIDs()
require.NoError(t, err)
require.Equal(t, 0, len(ids))

deployment := fixtures.GetDeployment()
require.NoError(t, deploymentsDS.UpsertDeployment(testCtx, deployment))

images := fixtures.DeploymentImages()
imageIDs := make([]string, 0, len(images))
for _, image := range images {
require.NoError(t, imageDS.UpsertImage(testCtx, image))
imageIDs = append(imageIDs, image.GetId())
}

ids, err = loop.getActiveImageIDs()
require.NoError(t, err)
require.ElementsMatch(t, imageIDs, ids)
}

func TestImagesWithSignaturesQuery(t *testing.T) {
t.Parallel()

Expand Down
Loading
Loading