Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion central/compliance/manager/manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ func (m *manager) createDomain(ctx context.Context, clusterID string) (framework
return nil, errors.Wrapf(err, "retrieving nodes for cluster %s", clusterID)
}

deployments, err := m.deploymentStore.SearchRawDeployments(ctx, clusterQuery)
activeDeploymentQuery := search.ConjunctionQuery(clusterQuery, datastore.ActiveDeploymentsQuery())
deployments, err := m.deploymentStore.SearchRawDeployments(ctx, activeDeploymentQuery)
if err != nil {
return nil, errors.Wrapf(err, "could not get deployments for cluster %s", clusterID)
}
Expand Down
11 changes: 9 additions & 2 deletions central/deployment/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,28 @@ type DataStore interface {

GetDeployment(ctx context.Context, id string) (*storage.Deployment, bool, error)
GetDeployments(ctx context.Context, ids []string) ([]*storage.Deployment, error)
CountDeployments(ctx context.Context) (int, error)
// UpsertDeployment adds or updates a deployment. If the deployment exists, the tags in the deployment are taken from
// the stored deployment.
UpsertDeployment(ctx context.Context, deployment *storage.Deployment) error

RemoveDeployment(ctx context.Context, clusterID, id string) error

GetImagesForDeployment(ctx context.Context, deployment *storage.Deployment) ([]*storage.Image, error)
GetDeploymentIDs(ctx context.Context) ([]string, error)
GetDeploymentIDs(ctx context.Context, q *v1.Query) ([]string, error)

WalkByQuery(ctx context.Context, query *v1.Query, fn func(deployment *storage.Deployment) error) error

GetContainerImageViews(ctx context.Context, q *v1.Query) ([]*views.ContainerImageView, error)
}

// ActiveDeploymentsQuery returns a query that filters for active deployments only.
// This is a convenience function to avoid repeating the same query construction throughout the codebase.
func ActiveDeploymentsQuery() *v1.Query {
return pkgSearch.NewQueryBuilder().
AddExactMatches(pkgSearch.DeploymentState, storage.DeploymentState_STATE_ACTIVE.String()).
ProtoQuery()
}

func newDataStore(
storage store.Store,
images imageDS.DataStore,
Expand Down
60 changes: 45 additions & 15 deletions central/deployment/datastore/datastore_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
imageUtils "github.com/stackrox/rox/pkg/images/utils"
"github.com/stackrox/rox/pkg/kubernetes"
"github.com/stackrox/rox/pkg/process/filter"
"github.com/stackrox/rox/pkg/protocompat"
"github.com/stackrox/rox/pkg/sac"
"github.com/stackrox/rox/pkg/sac/resources"
pkgSearch "github.com/stackrox/rox/pkg/search"
Expand Down Expand Up @@ -95,8 +96,9 @@ func (ds *datastoreImpl) initializeRanker() {

clusterScores := make(map[string]float32)
nsScores := make(map[string]float32)
// The store search function does not use select fields, only views do. Hence empty query is used in the walk below
err := ds.deploymentStore.WalkByQuery(readCtx, pkgSearch.EmptyQuery(), func(deployment *storage.Deployment) error {
// The store search function does not use select fields, only views do.
// Filter to active deployments only — deleted deployments should not contribute to risk rankings.
err := ds.deploymentStore.WalkByQuery(readCtx, ActiveDeploymentsQuery(), func(deployment *storage.Deployment) error {
riskScore := deployment.GetRiskScore()
ds.deploymentRanker.Add(deployment.GetId(), riskScore)

Expand Down Expand Up @@ -245,14 +247,6 @@ func (ds *datastoreImpl) GetDeployments(ctx context.Context, ids []string) ([]*s
return deployments, nil
}

// CountDeployments
func (ds *datastoreImpl) CountDeployments(ctx context.Context) (int, error) {
if _, err := deploymentsSAC.ReadAllowed(ctx); err != nil {
return 0, err
}
return ds.Count(ctx, pkgSearch.EmptyQuery())
}

func (ds *datastoreImpl) WalkByQuery(ctx context.Context, query *v1.Query, fn func(deployment *storage.Deployment) error) error {
wrappedFn := func(deployment *storage.Deployment) error {
ds.updateDeploymentPriority(deployment)
Expand Down Expand Up @@ -397,15 +391,47 @@ func (ds *datastoreImpl) RemoveDeployment(ctx context.Context, clusterID, id str
errorList.AddError(err)
}

// Delete should be last to ensure that the above is always cleaned up even in the case of crash
// TODO: fix me
// Soft delete: set deleted_at and transition state to STATE_DELETED.
// Also capture deployment data to update rankers after deletion.
var riskScore float32
var namespaceID string
var foundDeployment bool

err = ds.keyedMutex.DoStatusWithLock(id, func() error {
if err := ds.deploymentStore.Delete(ctx, id); err != nil {
deployment, exists, err := ds.deploymentStore.Get(ctx, id)
if err != nil {
return err
}
return nil
if !exists {
return nil
}
// Store deployment data for ranker updates after successful deletion.
foundDeployment = true
riskScore = deployment.GetRiskScore()
namespaceID = deployment.GetNamespaceId()

deployment.DeletedAt = protocompat.TimestampNow()
deployment.State = storage.DeploymentState_STATE_DELETED
return ds.deploymentStore.Upsert(ctx, deployment)
})
if err != nil {
errorList.AddError(err)
} else if foundDeployment {
// Successfully soft-deleted: remove from rankers.
ds.deploymentRanker.Remove(id)

// Update namespace ranker by subtracting this deployment's risk score.
if ds.nsRanker != nil {
oldNSScore := ds.nsRanker.GetScoreForID(namespaceID)
ds.nsRanker.Add(namespaceID, oldNSScore-riskScore)
}

// Update cluster ranker by subtracting this deployment's risk score.
if ds.clusterRanker != nil {
oldClusterScore := ds.clusterRanker.GetScoreForID(clusterID)
ds.clusterRanker.Add(clusterID, oldClusterScore-riskScore)
}
}

return errorList.ToError()
Expand Down Expand Up @@ -479,8 +505,12 @@ func (ds *datastoreImpl) updateDeploymentPriority(deployments ...*storage.Deploy
}
}

func (ds *datastoreImpl) GetDeploymentIDs(ctx context.Context) ([]string, error) {
return ds.deploymentStore.GetIDs(ctx)
func (ds *datastoreImpl) GetDeploymentIDs(ctx context.Context, q *v1.Query) ([]string, error) {
results, err := ds.deploymentStore.Search(ctx, q)
if err != nil {
return nil, err
}
return pkgSearch.ResultsToIDs(results), nil
}

func (ds *datastoreImpl) GetContainerImageViews(ctx context.Context, q *v1.Query) ([]*views.ContainerImageView, error) {
Expand Down
23 changes: 4 additions & 19 deletions central/deployment/datastore/mocks/datastore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions central/deployment/datastore/test/datastore_sac_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (s *deploymentDatastoreSACSuite) TestGetDeploymentIDs() {
for name, c := range cases {
s.Run(name, func() {
ctx := s.testContexts[c.ScopeKey]
fetchedIDs, getErr := s.datastore.GetDeploymentIDs(ctx)
fetchedIDs, getErr := s.datastore.GetDeploymentIDs(ctx, searchPkg.EmptyQuery())
s.Require().NoError(getErr)
// Note: the behaviour change may impact policy dry runs if the requester does not have full namespace scope.
s.ElementsMatch(fetchedIDs, c.ExpectedDeploymentIDs)
Expand Down Expand Up @@ -499,7 +499,7 @@ func (s *deploymentDatastoreSACSuite) TestUnrestrictedCount() {

func (s *deploymentDatastoreSACSuite) runTestCountDeployments(testCase testutils.SACSearchTestCase) {
ctx := s.testContexts[testCase.ScopeKey]
resultCount, err := s.datastore.CountDeployments(ctx)
resultCount, err := s.datastore.Count(ctx, searchPkg.EmptyQuery())
s.NoError(err)
expectedResultCount := testutils.AggregateCounts(s.T(), testCase.Results)
s.Equal(expectedResultCount, resultCount)
Expand Down
17 changes: 10 additions & 7 deletions central/deployment/views/list_deployment_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
// This view is used to populate ListDeployment protos from database queries.
// The db tags use search field labels (lowercase with underscores), not database column names.
type ListDeploymentView struct {
ID string `db:"deployment_id"`
Hash uint64 `db:"deployment_hash"`
Name string `db:"deployment"`
ClusterName string `db:"cluster"`
ClusterID string `db:"cluster_id"`
Namespace string `db:"namespace"`
Created *time.Time `db:"created"`
ID string `db:"deployment_id"`
Hash uint64 `db:"deployment_hash"`
Name string `db:"deployment"`
ClusterName string `db:"cluster"`
ClusterID string `db:"cluster_id"`
Namespace string `db:"namespace"`
Created *time.Time `db:"created"`
State storage.DeploymentState `db:"deployment_state"`
// Priority is NOT selected from DB - it's computed by the ranker
}

Expand All @@ -33,6 +34,7 @@ func (v *ListDeploymentView) ToListDeployment() *storage.ListDeployment {
ClusterId: v.ClusterID,
Namespace: v.Namespace,
Created: protocompat.ConvertTimeToTimestampOrNil(v.Created),
State: v.State,
// Priority is set by updateListDeploymentPriority in the datastore layer
}
}
Expand All @@ -47,5 +49,6 @@ func ListDeploymentViewSelects() []*v1.QuerySelect {
search.NewQuerySelect(search.ClusterID).Proto(),
search.NewQuerySelect(search.Namespace).Proto(),
search.NewQuerySelect(search.Created).Proto(),
search.NewQuerySelect(search.DeploymentState).Proto(),
}
}
4 changes: 2 additions & 2 deletions central/detection/lifecycle/manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (m *managerImpl) copyAndResetIndicatorQueue() map[string]*storage.ProcessIn

func (m *managerImpl) buildIndicatorFilter() {
ctx := sac.WithAllAccess(context.Background())
deploymentIDs, err := m.deploymentDataStore.GetDeploymentIDs(ctx)
deploymentIDs, err := m.deploymentDataStore.GetDeploymentIDs(ctx, deploymentDatastore.ActiveDeploymentsQuery())
if err != nil {
utils.Should(errors.Wrap(err, "error getting deployment IDs"))
return
Expand Down Expand Up @@ -429,7 +429,7 @@ func (m *managerImpl) filterOutDisabledPolicies(alerts *[]*storage.Alert) {
*alerts = filteredAlerts
}

// HandleDeploymentAlerts handles the lifecycle of the provided alerts (including alerting, merging, etc) all of which belong to the specified deployment
// HandleDeploymentAlerts handles the lifecycle of the provided alerts (including alerting, merging, etc) all of which belong to the specified deployment.
func (m *managerImpl) HandleDeploymentAlerts(deploymentID string, alerts []*storage.Alert, stage storage.LifecycleStage) error {
defer m.reprocessor.ReprocessRiskForDeployments(deploymentID)

Expand Down
12 changes: 7 additions & 5 deletions central/detection/runtime/detector_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/stackrox/rox/central/deployment/datastore"
"github.com/stackrox/rox/central/detection"
"github.com/stackrox/rox/generated/storage"
detectionPkg "github.com/stackrox/rox/pkg/detection"
"github.com/stackrox/rox/pkg/sac"
)
Expand Down Expand Up @@ -34,9 +35,10 @@ func (d *detectorImpl) DeploymentWhitelistedForPolicy(deploymentID, policyID str
if err != nil {
return err
}
if !exists {
// Assume it's not excluded if it doesn't exist, otherwise runtime alerts for deleted deployments
// will always get removed every time we update a policy.
if !exists || dep.GetState() == storage.DeploymentState_STATE_DELETED {
// Assume it's not excluded if it doesn't exist or is soft-deleted,
// otherwise runtime alerts for deleted deployments will always get
// removed every time we update a policy.
result = false
return nil
}
Expand All @@ -50,10 +52,10 @@ func (d *detectorImpl) DeploymentWhitelistedForPolicy(deploymentID, policyID str
}

func (d *detectorImpl) DeploymentInactive(deploymentID string) bool {
_, exists, err := d.deployments.ListDeployment(detectorCtx, deploymentID)
dep, exists, err := d.deployments.ListDeployment(detectorCtx, deploymentID)
if err != nil {
log.Errorf("Couldn't determine inactive state of deployment %q: %v", deploymentID, err)
return false
}
return !exists
return !exists || dep.GetState() == storage.DeploymentState_STATE_DELETED
}
5 changes: 3 additions & 2 deletions central/graphql/resolvers/loaders/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
deploymentsView "github.com/stackrox/rox/central/views/deployments"
v1 "github.com/stackrox/rox/generated/api/v1"
"github.com/stackrox/rox/generated/storage"
"github.com/stackrox/rox/pkg/search"
"github.com/stackrox/rox/pkg/sync"
)

Expand Down Expand Up @@ -94,9 +95,9 @@ func (idl *deploymentLoaderImpl) CountFromQuery(ctx context.Context, query *v1.Q
return int32(count), nil
}

// CountFromQuery returns the total number of deployments.
// CountAll returns the total number of deployments.
func (idl *deploymentLoaderImpl) CountAll(ctx context.Context) (int32, error) {
count, err := idl.ds.CountDeployments(ctx)
count, err := idl.ds.Count(ctx, search.EmptyQuery())
return int32(count), err
}

Expand Down
3 changes: 2 additions & 1 deletion central/graphql/resolvers/vulnerabilities_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/graph-gophers/graphql-go"
"github.com/pkg/errors"
"github.com/stackrox/rox/central/cve/converter/utils"
deploymentDS "github.com/stackrox/rox/central/deployment/datastore"
"github.com/stackrox/rox/central/graphql/resolvers/loaders"
"github.com/stackrox/rox/central/metrics"
v1 "github.com/stackrox/rox/generated/api/v1"
Expand Down Expand Up @@ -322,7 +323,7 @@ func (evr *EmbeddedVulnerabilityResolver) getEnvImpactComponentsForPerClusterVul
}

func (evr *EmbeddedVulnerabilityResolver) getEnvImpactComponentsForImages(ctx context.Context) (numerator, denominator int, err error) {
allDepsCount, err := evr.root.DeploymentDataStore.CountDeployments(ctx)
allDepsCount, err := evr.root.DeploymentDataStore.Count(ctx, deploymentDS.ActiveDeploymentsQuery())
if err != nil {
return 0, 0, err
}
Expand Down
3 changes: 1 addition & 2 deletions central/metrics/custom/image_vulnerabilities/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
deploymentDS "github.com/stackrox/rox/central/deployment/datastore"
"github.com/stackrox/rox/central/metrics/custom/tracker"
"github.com/stackrox/rox/generated/storage"
"github.com/stackrox/rox/pkg/search"
)

func New(ds deploymentDS.DataStore) *tracker.TrackerBase[*finding] {
Expand All @@ -24,7 +23,7 @@ func track(ctx context.Context, ds deploymentDS.DataStore) tracker.FindingErrorS
return func(yield func(*finding, error) bool) {
var f finding
collector := tracker.NewFindingCollector(yield)
collector.Finally(ds.WalkByQuery(ctx, search.EmptyQuery(), func(deployment *storage.Deployment) error {
collector.Finally(ds.WalkByQuery(ctx, deploymentDS.ActiveDeploymentsQuery(), func(deployment *storage.Deployment) error {
f.deployment = deployment
images, err := ds.GetImagesForDeployment(ctx, deployment)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions central/networkgraph/service/service_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ func (s *serviceImpl) getNetworkGraph(ctx context.Context, request *v1.NetworkGr
if err != nil {
return nil, err
}
deploymentQuery = search.ConjunctionQuery(deploymentQuery, deploymentDS.ActiveDeploymentsQuery())

count, err := s.deployments.Count(ctx, deploymentQuery)
if err != nil {
Expand Down Expand Up @@ -763,6 +764,7 @@ func (s *serviceImpl) getExternalFlowsAndEntitiesByQuery(ctx context.Context, cl
if err != nil {
return nil, nil, errors.Wrap(err, "failed to construct filter and scope queries")
}
deploymentQuery = search.ConjunctionQuery(deploymentQuery, deploymentDS.ActiveDeploymentsQuery())

count, err := s.deployments.Count(ctx, deploymentQuery)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion central/networkpolicies/generator/generator_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (g *generator) generateGraph(ctx context.Context, clusterID string, query *
}

clusterIDQuery := search.NewQueryBuilder().AddExactMatches(search.ClusterID, clusterID).ProtoQuery()
deploymentsQuery := clusterIDQuery
deploymentsQuery := search.ConjunctionQuery(clusterIDQuery, dDS.ActiveDeploymentsQuery())
if query.GetQuery() != nil {
deploymentsQuery = search.ConjunctionQuery(deploymentsQuery, query)
}
Expand Down
4 changes: 2 additions & 2 deletions central/platform/reprocessor/reprocessor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ func (pr *platformReprocessorImpl) reprocessAlerts() error {
func (pr *platformReprocessorImpl) reprocessDeployments() error {
var q *v1.Query
if pr.customized {
q = search.EmptyQuery()
q = deploymentDS.ActiveDeploymentsQuery()
} else {
q = unsetPlatformComponentQuery
q = search.ConjunctionQuery(unsetPlatformComponentQuery, deploymentDS.ActiveDeploymentsQuery())
}
q.Pagination = &v1.QueryPagination{
Limit: batchSize,
Expand Down
2 changes: 1 addition & 1 deletion central/policy/service/service_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func (s *serviceImpl) predicateBasedDryRunPolicy(ctx context.Context, cancelCtx
return nil, errors.Wrapf(errox.InvalidArgs, "invalid policy: %v", err)
}

deploymentIds, err := s.deployments.GetDeploymentIDs(ctx)
deploymentIds, err := s.deployments.GetDeploymentIDs(ctx, deploymentDataStore.ActiveDeploymentsQuery())
if err != nil {
return nil, err
}
Expand Down
Loading
Loading