Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion central/image/service/service_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stackrox/rox/pkg/images/utils"
"github.com/stackrox/rox/pkg/search"
"github.com/stackrox/rox/pkg/search/paginated"
"github.com/stackrox/rox/pkg/set"
"github.com/stackrox/rox/pkg/timestamp"
pkgUtils "github.com/stackrox/rox/pkg/utils"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -228,7 +229,23 @@ func (s *serviceImpl) ScanImageInternal(ctx context.Context, request *v1.ScanIma
}

img := types.ToImage(request.GetImage())
if _, err := s.enricher.EnrichImage(ctx, enricher.EnrichmentContext{FetchOpt: fetchOpt, Internal: true}, img); err != nil {

var source *enricher.RequestSource
if request.GetSource() != nil {
source = &enricher.RequestSource{
ClusterID: request.GetSource().GetClusterId(),
Namespace: request.GetSource().GetNamespace(),
ImagePullSecrets: set.NewStringSet(request.GetSource().GetImagePullSecrets()...),
}
}

enrichmentContext := enricher.EnrichmentContext{
FetchOpt: fetchOpt,
Internal: true,
Source: source,
}

if _, err := s.enricher.EnrichImage(ctx, enrichmentContext, img); err != nil {
log.Errorf("error enriching image %q: %v", request.GetImage().GetName().GetFullName(), err)
// purposefully, don't return here because we still need to save it into the DB so there is a reference
// even if we weren't able to enrich it
Expand Down
4 changes: 3 additions & 1 deletion central/imageintegration/datastore/datastore_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ func (ds *datastoreImpl) AddImageIntegration(ctx context.Context, integration *s
return "", sac.ErrResourceAccessDenied
}

integration.Id = uuid.NewV4().String()
if integration.GetId() == "" {
integration.Id = uuid.NewV4().String()
}
err := ds.storage.Upsert(ctx, integration)
if err != nil {
return "", err
Expand Down
158 changes: 121 additions & 37 deletions central/sensor/service/pipeline/imageintegrations/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ import (
countMetrics "github.com/stackrox/rox/central/metrics"
"github.com/stackrox/rox/central/reprocessor"
"github.com/stackrox/rox/central/sensor/service/common"
"github.com/stackrox/rox/central/sensor/service/connection"
"github.com/stackrox/rox/central/sensor/service/pipeline"
"github.com/stackrox/rox/central/sensor/service/pipeline/reconciliation"
v1 "github.com/stackrox/rox/generated/api/v1"
"github.com/stackrox/rox/generated/internalapi/central"
"github.com/stackrox/rox/generated/storage"
"github.com/stackrox/rox/pkg/centralsensor"
"github.com/stackrox/rox/pkg/env"
"github.com/stackrox/rox/pkg/errox"
"github.com/stackrox/rox/pkg/logging"
"github.com/stackrox/rox/pkg/metrics"
"github.com/stackrox/rox/pkg/set"
"github.com/stackrox/rox/pkg/tlscheck"
"github.com/stackrox/rox/pkg/urlfmt"
)
Expand All @@ -35,7 +39,8 @@ func GetPipeline() pipeline.Fragment {
return NewPipeline(enrichment.ManagerSingleton(),
datastore.Singleton(),
clusterDatastore.Singleton(),
reprocessor.Singleton())
reprocessor.Singleton(),
)
}

// NewPipeline returns a new instance of Pipeline.
Expand All @@ -59,9 +64,35 @@ type pipelineImpl struct {
enrichAndDetectLoop reprocessor.Loop
}

func (s *pipelineImpl) Reconcile(_ context.Context, _ string, _ *reconciliation.StoreMap) error {
// Nothing to reconcile for image integrations
return nil
func (s *pipelineImpl) Reconcile(ctx context.Context, clusterID string, storeMap *reconciliation.StoreMap) error {
existingIDs := set.NewStringSet()

conn := connection.FromContext(ctx)

// We should not reconcile image integrations unless the Sensor is on a version that can provide scoped integrations
// with consistent uuids
if !conn.HasCapability(centralsensor.ScopedImageIntegrations) {
return nil
}

integrations, err := s.datastore.GetImageIntegrations(ctx, &v1.GetImageIntegrationsRequest{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can already filter the integrations by cluster here instead of within line 90.

Suggested change
integrations, err := s.datastore.GetImageIntegrations(ctx, &v1.GetImageIntegrationsRequest{})
integrations, err := s.datastore.GetImageIntegrations(ctx, &v1.GetImageIntegrationsRequest{Cluster: clusterID})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, so long story, this doesn't work. I'll fix it in a follow up

if err != nil {
return errors.Wrap(err, "getting image integrations for reconciliation")
}
for _, integration := range integrations {
// Skipping ECR image integrations since they are special and use AWS IAM.
if integration.GetEcr() != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity: why do we skip ECR integrations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ECR autogenerated integrations are special and use AWS IAM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment here, then?

continue
}
if integration.GetClusterId() != clusterID {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at v1.GetImageIntegrationsRequest and it seems that it supports specifying a cluster (name only, not ID?). However, the handling of that field is extremely weird (`central/imageintegration/datastore/datastore_impl.go:55-64):

	integrations, err := ds.storage.GetAll(ctx)
	if err != nil {
		return nil, err
	}

	integrationSlice := integrations[:0]
	for _, integration := range integrations {
		if request.GetCluster() != "" {
			continue
		}

So if request specifies a non-empty cluster, it will always return an empty slice, while still doing a read on the DB. Not sure if there's a bug or this param was just never supported, but either way, this probably deserves a JIRA filed for it and addressed some time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, seems like the original PR #658 was actually removing support for this parameter (but apparently, this wasn't done because there was still something left for autogenerated image integrations that uses this parameter).

I do agree, it does justify a JIRA, I created one: ROX-13071

continue
}
existingIDs.Add(integration.GetId())
}
store := storeMap.Get((*central.SensorEvent_ImageIntegration)(nil))
return reconciliation.Perform(store, existingIDs, "imageintegrations", func(id string) error {
return s.datastore.RemoveImageIntegration(ctx, id)
})
}

func (s *pipelineImpl) Match(msg *central.MsgFromSensor) bool {
Expand Down Expand Up @@ -139,11 +170,13 @@ func setUpIntegrationParams(ctx context.Context, imageIntegration *storage.Image
switch config := imageIntegration.GetIntegrationConfig().(type) {
case *storage.ImageIntegration_Docker:
dockerCfg := config.Docker
var validTLS bool
validTLS, err = tlscheck.CheckTLS(ctx, dockerCfg.GetEndpoint())
if err != nil {
err = errors.Wrapf(err, "reaching out for TLS check to %s", dockerCfg.GetEndpoint())
return
validTLS, tlsErr := tlscheck.CheckTLS(ctx, dockerCfg.GetEndpoint())
if tlsErr != nil {
log.Debugf("reaching out for TLS check to %s: %v", dockerCfg.GetEndpoint(), err)
// Not enough evidence that we can skip TLS, so conservatively require it.
dockerCfg.Insecure = false
} else {
dockerCfg.Insecure = !validTLS
}
dockerCfg.Insecure = !validTLS
dockerCfg.Endpoint = parseEndpointForURL(dockerCfg.GetEndpoint())
Expand All @@ -161,34 +194,7 @@ func setUpIntegrationParams(ctx context.Context, imageIntegration *storage.Image
return
}

// Run runs the pipeline template on the input and returns the output.
// Action is currently always update.
func (s *pipelineImpl) Run(ctx context.Context, clusterID string, msg *central.MsgFromSensor, _ common.MessageInjector) error {
// Ignore autogenerated registries if they are disabled
if autogeneratedRegistriesDisabled {
return nil
}

defer countMetrics.IncrementResourceProcessedCounter(pipeline.ActionToOperation(msg.GetEvent().GetAction()), metrics.ImageIntegration)

// Extract the cluster name.
clusterName, exists, err := s.clusterDatastore.GetClusterName(ctx, clusterID)
if err != nil {
return errors.Wrapf(err, "error getting cluster name for cluster ID: %s", clusterID)
}
if !exists {
return fmt.Errorf("cluster with id %q does not exist", clusterID)
}

// Set up the integration and update its parameters.
imageIntegration := msg.GetEvent().GetImageIntegration()
description, matches, err := setUpIntegrationParams(ctx, imageIntegration)
if err != nil {
return err
}
imageIntegration.Name = fmt.Sprintf("Autogenerated %s for cluster %s", description, clusterName)
imageIntegration.ClusterId = clusterID

func (s *pipelineImpl) legacyRun(ctx context.Context, imageIntegration *storage.ImageIntegration, matches matchingFunc) error {
// Fetch existing integration and determine if we should update by matching its configuration type.
existingIntegrations, err := s.datastore.GetImageIntegrations(ctx, &v1.GetImageIntegrationsRequest{})
if err != nil {
Expand Down Expand Up @@ -226,4 +232,82 @@ func (s *pipelineImpl) Run(ctx context.Context, clusterID string, msg *central.M
return nil
}

func (s *pipelineImpl) runRemove(ctx context.Context, id string) error {
if err := s.integrationManager.Remove(id); err != nil {
return errors.Wrap(err, "removing integration from manager")
}
return s.datastore.RemoveImageIntegration(ctx, id)
}

// Run runs the pipeline template on the input and returns the output.
// Action is currently always update.
func (s *pipelineImpl) Run(ctx context.Context, clusterID string, msg *central.MsgFromSensor, _ common.MessageInjector) error {
// Ignore autogenerated registries if they are disabled
if autogeneratedRegistriesDisabled {
return nil
}
defer countMetrics.IncrementResourceProcessedCounter(pipeline.ActionToOperation(msg.GetEvent().GetAction()), metrics.ImageIntegration)

if msg.GetEvent().GetAction() == central.ResourceAction_REMOVE_RESOURCE {
return s.runRemove(ctx, msg.GetEvent().GetImageIntegration().GetId())
}

// Extract the cluster name.
clusterName, exists, err := s.clusterDatastore.GetClusterName(ctx, clusterID)
if err != nil {
return errors.Wrapf(err, "error getting cluster name for cluster ID: %s", clusterID)
}
if !exists {
return errox.NotFound.Newf("cluster with id %q does not exist", clusterID)
}

// Set up the integration and update its parameters.
imageIntegration := msg.GetEvent().GetImageIntegration()
imageIntegration.ClusterId = clusterID

description, matches, err := setUpIntegrationParams(ctx, imageIntegration)
if err != nil {
return errors.Wrapf(err, "setting up integration params for %q", imageIntegration.GetId())
}
source := imageIntegration.GetSource()
if source == nil {
return s.legacyRun(ctx, imageIntegration, matches)
}
imageIntegration.Name = fmt.Sprintf("Autogenerated %s for cluster %s from %s/%s",
description, clusterName, source.GetNamespace(), source.GetImagePullSecretName())

integrationExisted, err := s.upsertImageIntegration(ctx, imageIntegration)
if err != nil {
return err
}

// Currently, we will only trigger a reprocess of deployments when an image integration is newly added instead of
// updated. Reasoning behind this is that we currently do not have a way of scoping a reprocessing to specific
// deployments based on properties (i.e. deployments from the same cluster, deployments using a specific image
// integration). This combined with the frequent updates of image integrations on OpenShift and the fact that
// deployments will be reprocessed in ~1hr, we will skip reprocessing on updates for now.
if !integrationExisted {
s.enrichAndDetectLoop.ShortCircuit()
}

return nil
}

func (s *pipelineImpl) OnFinish(_ string) {}

// upsertImageIntegration will add the given image integration within the datastore.
// It will return any errors during the creation and whether the integration existed previously.
func (s *pipelineImpl) upsertImageIntegration(ctx context.Context, imageIntegration *storage.ImageIntegration) (existed bool, err error) {
_, existed, err = s.datastore.GetImageIntegration(ctx, imageIntegration.GetId())
if err != nil {
return existed, errors.Wrapf(err, "retrieving image integration %q", imageIntegration.GetId())
}

if _, err := s.datastore.AddImageIntegration(ctx, imageIntegration); err != nil {
return existed, errors.Wrapf(err, "adding image integration %q", imageIntegration.GetId())
}
if err := s.integrationManager.Upsert(imageIntegration); err != nil {
return existed, errors.Wrapf(err, "notifying of update for image integration %q", imageIntegration.GetId())
}
return existed, nil
}
44 changes: 22 additions & 22 deletions generated/api/v1/compliance_service.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 18 additions & 1 deletion generated/api/v1/image_integration_service.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading