-
Notifications
You must be signed in to change notification settings - Fork 171
ROX-11657: Scope autogenerated image integrations by the image pull secrets #2572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,14 +12,18 @@ import ( | |
| countMetrics "github.com/stackrox/rox/central/metrics" | ||
| "github.com/stackrox/rox/central/reprocessor" | ||
| "github.com/stackrox/rox/central/sensor/service/common" | ||
| "github.com/stackrox/rox/central/sensor/service/connection" | ||
| "github.com/stackrox/rox/central/sensor/service/pipeline" | ||
| "github.com/stackrox/rox/central/sensor/service/pipeline/reconciliation" | ||
| v1 "github.com/stackrox/rox/generated/api/v1" | ||
| "github.com/stackrox/rox/generated/internalapi/central" | ||
| "github.com/stackrox/rox/generated/storage" | ||
| "github.com/stackrox/rox/pkg/centralsensor" | ||
| "github.com/stackrox/rox/pkg/env" | ||
| "github.com/stackrox/rox/pkg/errox" | ||
| "github.com/stackrox/rox/pkg/logging" | ||
| "github.com/stackrox/rox/pkg/metrics" | ||
| "github.com/stackrox/rox/pkg/set" | ||
| "github.com/stackrox/rox/pkg/tlscheck" | ||
| "github.com/stackrox/rox/pkg/urlfmt" | ||
| ) | ||
|
|
@@ -35,7 +39,8 @@ func GetPipeline() pipeline.Fragment { | |
| return NewPipeline(enrichment.ManagerSingleton(), | ||
| datastore.Singleton(), | ||
| clusterDatastore.Singleton(), | ||
| reprocessor.Singleton()) | ||
| reprocessor.Singleton(), | ||
| ) | ||
| } | ||
|
|
||
| // NewPipeline returns a new instance of Pipeline. | ||
|
|
@@ -59,9 +64,35 @@ type pipelineImpl struct { | |
| enrichAndDetectLoop reprocessor.Loop | ||
| } | ||
|
|
||
| func (s *pipelineImpl) Reconcile(_ context.Context, _ string, _ *reconciliation.StoreMap) error { | ||
| // Nothing to reconcile for image integrations | ||
| return nil | ||
| func (s *pipelineImpl) Reconcile(ctx context.Context, clusterID string, storeMap *reconciliation.StoreMap) error { | ||
| existingIDs := set.NewStringSet() | ||
|
|
||
| conn := connection.FromContext(ctx) | ||
|
|
||
| // We should not reconcile image integrations unless the Sensor is on a version that can provide scoped integrations | ||
| // with consistent uuids | ||
| if !conn.HasCapability(centralsensor.ScopedImageIntegrations) { | ||
| return nil | ||
| } | ||
|
|
||
| integrations, err := s.datastore.GetImageIntegrations(ctx, &v1.GetImageIntegrationsRequest{}) | ||
| if err != nil { | ||
| return errors.Wrap(err, "getting image integrations for reconciliation") | ||
| } | ||
| for _, integration := range integrations { | ||
| // Skipping ECR image integrations since they are special and use AWS IAM. | ||
| if integration.GetEcr() != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. out of curiosity: why do we skip ECR integrations?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The ECR autogenerated integrations are special and use AWS IAM
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add a comment here, then? |
||
| continue | ||
| } | ||
| if integration.GetClusterId() != clusterID { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I looked at So if
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| continue | ||
| } | ||
| existingIDs.Add(integration.GetId()) | ||
| } | ||
| store := storeMap.Get((*central.SensorEvent_ImageIntegration)(nil)) | ||
| return reconciliation.Perform(store, existingIDs, "imageintegrations", func(id string) error { | ||
| return s.datastore.RemoveImageIntegration(ctx, id) | ||
| }) | ||
| } | ||
|
|
||
| func (s *pipelineImpl) Match(msg *central.MsgFromSensor) bool { | ||
|
|
@@ -139,11 +170,13 @@ func setUpIntegrationParams(ctx context.Context, imageIntegration *storage.Image | |
| switch config := imageIntegration.GetIntegrationConfig().(type) { | ||
| case *storage.ImageIntegration_Docker: | ||
| dockerCfg := config.Docker | ||
| var validTLS bool | ||
| validTLS, err = tlscheck.CheckTLS(ctx, dockerCfg.GetEndpoint()) | ||
| if err != nil { | ||
| err = errors.Wrapf(err, "reaching out for TLS check to %s", dockerCfg.GetEndpoint()) | ||
| return | ||
| validTLS, tlsErr := tlscheck.CheckTLS(ctx, dockerCfg.GetEndpoint()) | ||
| if tlsErr != nil { | ||
| log.Debugf("reaching out for TLS check to %s: %v", dockerCfg.GetEndpoint(), err) | ||
| // Not enough evidence that we can skip TLS, so conservatively require it. | ||
| dockerCfg.Insecure = false | ||
| } else { | ||
| dockerCfg.Insecure = !validTLS | ||
| } | ||
| dockerCfg.Insecure = !validTLS | ||
| dockerCfg.Endpoint = parseEndpointForURL(dockerCfg.GetEndpoint()) | ||
|
|
@@ -161,34 +194,7 @@ func setUpIntegrationParams(ctx context.Context, imageIntegration *storage.Image | |
| return | ||
| } | ||
|
|
||
| // Run runs the pipeline template on the input and returns the output. | ||
| // Action is currently always update. | ||
| func (s *pipelineImpl) Run(ctx context.Context, clusterID string, msg *central.MsgFromSensor, _ common.MessageInjector) error { | ||
| // Ignore autogenerated registries if they are disabled | ||
| if autogeneratedRegistriesDisabled { | ||
| return nil | ||
| } | ||
|
|
||
| defer countMetrics.IncrementResourceProcessedCounter(pipeline.ActionToOperation(msg.GetEvent().GetAction()), metrics.ImageIntegration) | ||
|
|
||
| // Extract the cluster name. | ||
| clusterName, exists, err := s.clusterDatastore.GetClusterName(ctx, clusterID) | ||
| if err != nil { | ||
| return errors.Wrapf(err, "error getting cluster name for cluster ID: %s", clusterID) | ||
| } | ||
| if !exists { | ||
| return fmt.Errorf("cluster with id %q does not exist", clusterID) | ||
| } | ||
|
|
||
| // Set up the integration and update its parameters. | ||
| imageIntegration := msg.GetEvent().GetImageIntegration() | ||
| description, matches, err := setUpIntegrationParams(ctx, imageIntegration) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| imageIntegration.Name = fmt.Sprintf("Autogenerated %s for cluster %s", description, clusterName) | ||
| imageIntegration.ClusterId = clusterID | ||
|
|
||
| func (s *pipelineImpl) legacyRun(ctx context.Context, imageIntegration *storage.ImageIntegration, matches matchingFunc) error { | ||
| // Fetch existing integration and determine if we should update by matching its configuration type. | ||
| existingIntegrations, err := s.datastore.GetImageIntegrations(ctx, &v1.GetImageIntegrationsRequest{}) | ||
| if err != nil { | ||
|
|
@@ -226,4 +232,82 @@ func (s *pipelineImpl) Run(ctx context.Context, clusterID string, msg *central.M | |
| return nil | ||
| } | ||
|
|
||
| func (s *pipelineImpl) runRemove(ctx context.Context, id string) error { | ||
| if err := s.integrationManager.Remove(id); err != nil { | ||
| return errors.Wrap(err, "removing integration from manager") | ||
| } | ||
| return s.datastore.RemoveImageIntegration(ctx, id) | ||
| } | ||
|
|
||
| // Run runs the pipeline template on the input and returns the output. | ||
| // Action is currently always update. | ||
| func (s *pipelineImpl) Run(ctx context.Context, clusterID string, msg *central.MsgFromSensor, _ common.MessageInjector) error { | ||
| // Ignore autogenerated registries if they are disabled | ||
| if autogeneratedRegistriesDisabled { | ||
| return nil | ||
| } | ||
| defer countMetrics.IncrementResourceProcessedCounter(pipeline.ActionToOperation(msg.GetEvent().GetAction()), metrics.ImageIntegration) | ||
|
|
||
| if msg.GetEvent().GetAction() == central.ResourceAction_REMOVE_RESOURCE { | ||
| return s.runRemove(ctx, msg.GetEvent().GetImageIntegration().GetId()) | ||
| } | ||
|
|
||
| // Extract the cluster name. | ||
| clusterName, exists, err := s.clusterDatastore.GetClusterName(ctx, clusterID) | ||
| if err != nil { | ||
| return errors.Wrapf(err, "error getting cluster name for cluster ID: %s", clusterID) | ||
| } | ||
| if !exists { | ||
| return errox.NotFound.Newf("cluster with id %q does not exist", clusterID) | ||
| } | ||
|
|
||
| // Set up the integration and update its parameters. | ||
| imageIntegration := msg.GetEvent().GetImageIntegration() | ||
| imageIntegration.ClusterId = clusterID | ||
|
|
||
| description, matches, err := setUpIntegrationParams(ctx, imageIntegration) | ||
| if err != nil { | ||
| return errors.Wrapf(err, "setting up integration params for %q", imageIntegration.GetId()) | ||
| } | ||
| source := imageIntegration.GetSource() | ||
| if source == nil { | ||
| return s.legacyRun(ctx, imageIntegration, matches) | ||
| } | ||
| imageIntegration.Name = fmt.Sprintf("Autogenerated %s for cluster %s from %s/%s", | ||
| description, clusterName, source.GetNamespace(), source.GetImagePullSecretName()) | ||
|
|
||
| integrationExisted, err := s.upsertImageIntegration(ctx, imageIntegration) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Currently, we will only trigger a reprocess of deployments when an image integration is newly added instead of | ||
| // updated. Reasoning behind this is that we currently do not have a way of scoping a reprocessing to specific | ||
| // deployments based on properties (i.e. deployments from the same cluster, deployments using a specific image | ||
| // integration). This combined with the frequent updates of image integrations on OpenShift and the fact that | ||
| // deployments will be reprocessed in ~1hr, we will skip reprocessing on updates for now. | ||
| if !integrationExisted { | ||
| s.enrichAndDetectLoop.ShortCircuit() | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (s *pipelineImpl) OnFinish(_ string) {} | ||
|
|
||
| // upsertImageIntegration will add the given image integration within the datastore. | ||
| // It will return any errors during the creation and whether the integration existed previously. | ||
| func (s *pipelineImpl) upsertImageIntegration(ctx context.Context, imageIntegration *storage.ImageIntegration) (existed bool, err error) { | ||
| _, existed, err = s.datastore.GetImageIntegration(ctx, imageIntegration.GetId()) | ||
| if err != nil { | ||
| return existed, errors.Wrapf(err, "retrieving image integration %q", imageIntegration.GetId()) | ||
| } | ||
|
|
||
| if _, err := s.datastore.AddImageIntegration(ctx, imageIntegration); err != nil { | ||
| return existed, errors.Wrapf(err, "adding image integration %q", imageIntegration.GetId()) | ||
| } | ||
| if err := s.integrationManager.Upsert(imageIntegration); err != nil { | ||
| return existed, errors.Wrapf(err, "notifying of update for image integration %q", imageIntegration.GetId()) | ||
| } | ||
| return existed, nil | ||
| } | ||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can already filter the integrations by cluster here instead of within line 90.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, so long story, this doesn't work. I'll fix it in a follow up