From 50d4e569428ce64715912401e9a26f078a51b588 Mon Sep 17 00:00:00 2001 From: Giles Hutton Date: Wed, 1 Apr 2026 12:21:31 +0100 Subject: [PATCH] ROX-33799: replace detector.New() with builder pattern The 14-parameter positional constructor was hard to read and extend. Replaced with a Builder struct and chained WithX() methods as described in the ticket. Exported the ClusterIDProvider interface (formerly clusterIDPeekWaiter) to support the exported Builder type. Build() validates that the 13 required dependencies are set, returning an error if any are missing. Only admCtrlSettingsMgr is optional (nil-guarded in the detector implementation). Co-Authored-By: Claude Opus 4.6 (1M context) --- sensor/common/detector/builder.go | 223 +++++++++++++++++++++++++++++ sensor/common/detector/detector.go | 84 ----------- sensor/common/detector/enricher.go | 7 +- sensor/kubernetes/sensor/sensor.go | 21 ++- 4 files changed, 247 insertions(+), 88 deletions(-) create mode 100644 sensor/common/detector/builder.go diff --git a/sensor/common/detector/builder.go b/sensor/common/detector/builder.go new file mode 100644 index 0000000000000..8b75cf259e2e2 --- /dev/null +++ b/sensor/common/detector/builder.go @@ -0,0 +1,223 @@ +package detector + +import ( + "github.com/pkg/errors" + "github.com/stackrox/rox/generated/internalapi/sensor" + "github.com/stackrox/rox/pkg/concurrency" + "github.com/stackrox/rox/pkg/env" + "github.com/stackrox/rox/pkg/scopecomp" + queueScaler "github.com/stackrox/rox/pkg/sensor/queue" + "github.com/stackrox/rox/sensor/common/admissioncontroller" + "github.com/stackrox/rox/sensor/common/detector/baseline" + detectorMetrics "github.com/stackrox/rox/sensor/common/detector/metrics" + networkBaselineEval "github.com/stackrox/rox/sensor/common/detector/networkbaseline" + "github.com/stackrox/rox/sensor/common/detector/queue" + "github.com/stackrox/rox/sensor/common/detector/unified" + "github.com/stackrox/rox/sensor/common/enforcer" + "github.com/stackrox/rox/sensor/common/externalsrcs" + "github.com/stackrox/rox/sensor/common/image/cache" + "github.com/stackrox/rox/sensor/common/message" + "github.com/stackrox/rox/sensor/common/registry" + "github.com/stackrox/rox/sensor/common/scan" + "github.com/stackrox/rox/sensor/common/store" + "github.com/stackrox/rox/sensor/common/updater" +) + +// Builder constructs a Detector with a fluent API. +type Builder struct { + clusterID ClusterIDProvider + enforcer enforcer.Enforcer + admCtrlSettingsMgr admissioncontroller.SettingsManager + deploymentStore store.DeploymentStore + serviceAccountStore store.ServiceAccountStore + imageCache cache.Image + auditLogEvents chan *sensor.AuditEvents + auditLogUpdater updater.Component + networkPolicyStore store.NetworkPolicyStore + registryStore *registry.Store + localScan *scan.LocalScan + nodeStore store.NodeStore + clusterLabelProvider scopecomp.ClusterLabelProvider + namespaceLabelProvider scopecomp.NamespaceLabelProvider +} + +// NewBuilder returns a new Builder for constructing a Detector. +func NewBuilder() *Builder { + return &Builder{} +} + +func (b *Builder) WithClusterID(id ClusterIDProvider) *Builder { + b.clusterID = id + return b +} + +func (b *Builder) WithEnforcer(e enforcer.Enforcer) *Builder { + b.enforcer = e + return b +} + +func (b *Builder) WithAdmCtrlSettingsMgr(mgr admissioncontroller.SettingsManager) *Builder { + b.admCtrlSettingsMgr = mgr + return b +} + +func (b *Builder) WithDeploymentStore(ds store.DeploymentStore) *Builder { + b.deploymentStore = ds + return b +} + +func (b *Builder) WithServiceAccountStore(sas store.ServiceAccountStore) *Builder { + b.serviceAccountStore = sas + return b +} + +func (b *Builder) WithImageCache(c cache.Image) *Builder { + b.imageCache = c + return b +} + +func (b *Builder) WithAuditLogEvents(events chan *sensor.AuditEvents) *Builder { + b.auditLogEvents = events + return b +} + +func (b *Builder) WithAuditLogUpdater(u updater.Component) *Builder { + b.auditLogUpdater = u + return b +} + +func (b *Builder) WithNetworkPolicyStore(nps store.NetworkPolicyStore) *Builder { + b.networkPolicyStore = nps + return b +} + +func (b *Builder) WithRegistryStore(rs *registry.Store) *Builder { + b.registryStore = rs + return b +} + +func (b *Builder) WithLocalScan(ls *scan.LocalScan) *Builder { + b.localScan = ls + return b +} + +func (b *Builder) WithNodeStore(ns store.NodeStore) *Builder { + b.nodeStore = ns + return b +} + +func (b *Builder) WithClusterLabelProvider(clp scopecomp.ClusterLabelProvider) *Builder { + b.clusterLabelProvider = clp + return b +} + +func (b *Builder) WithNamespaceLabelProvider(nlp scopecomp.NamespaceLabelProvider) *Builder { + b.namespaceLabelProvider = nlp + return b +} + +func (b *Builder) validate() error { + type field struct { + name string + value any + } + for _, f := range []field{ + {"ClusterID", b.clusterID}, + {"Enforcer", b.enforcer}, + {"DeploymentStore", b.deploymentStore}, + {"ServiceAccountStore", b.serviceAccountStore}, + {"ImageCache", b.imageCache}, + {"AuditLogEvents", b.auditLogEvents}, + {"AuditLogUpdater", b.auditLogUpdater}, + {"NetworkPolicyStore", b.networkPolicyStore}, + {"RegistryStore", b.registryStore}, + {"LocalScan", b.localScan}, + {"NodeStore", b.nodeStore}, + {"ClusterLabelProvider", b.clusterLabelProvider}, + {"NamespaceLabelProvider", b.namespaceLabelProvider}, + } { + if f.value == nil { + return errors.Errorf("detector.Builder: %s is required", f.name) + } + } + return nil +} + +// Build creates a new Detector from the builder configuration. +func (b *Builder) Build() (Detector, error) { + if err := b.validate(); err != nil { + return nil, err + } + + detectorStopper := concurrency.NewStopper() + netFlowQueueSize := queueScaler.ScaleSizeOnNonDefault(env.DetectorNetworkFlowBufferSize) + piQueueSize := queueScaler.ScaleSizeOnNonDefault(env.DetectorProcessIndicatorBufferSize) + fileAccessQueueSize := queueScaler.ScaleSizeOnNonDefault(env.DetectorFileAccessBufferSize) + deploymentQueueSize := 0 + if env.DetectorDeploymentBufferSize.IntegerSetting() > 0 { + deploymentQueueSize = queueScaler.ScaleSizeOnNonDefault(env.DetectorDeploymentBufferSize) + } + netFlowQueue := queue.NewQueue[*queue.FlowQueueItem]( + detectorStopper, + "FlowsQueue", + netFlowQueueSize, + detectorMetrics.DetectorNetworkFlowQueueOperations, + detectorMetrics.DetectorNetworkFlowDroppedCount, + ) + piQueue := queue.NewQueue[*queue.IndicatorQueueItem]( + detectorStopper, + "PIsQueue", + piQueueSize, + detectorMetrics.DetectorProcessIndicatorQueueOperations, + detectorMetrics.DetectorProcessIndicatorDroppedCount, + ) + // We only need the SimpleQueue since the deploymentQueue will not be paused/resumed + deploymentQueue := queue.NewSimpleQueue[*queue.DeploymentQueueItem]( + "DeploymentQueue", + deploymentQueueSize, + detectorMetrics.DetectorDeploymentQueueOperations, + detectorMetrics.DetectorDeploymentDroppedCount, + ) + + fileAccessQueue := queue.NewQueue[*queue.FileAccessQueueItem]( + detectorStopper, + "FileAccessQueue", + fileAccessQueueSize, + detectorMetrics.DetectorFileAccessQueueOperations, + detectorMetrics.DetectorFileAccessDroppedCount, + ) + + return &detectorImpl{ + unifiedDetector: unified.NewDetector(b.clusterLabelProvider, b.namespaceLabelProvider), + + output: make(chan *message.ExpiringMessage), + auditEventsChan: b.auditLogEvents, + deploymentAlertOutputChan: make(chan outputResult), + deploymentProcessingMap: make(map[string]int64), + + enricher: newEnricher(b.clusterID, b.imageCache, b.serviceAccountStore, b.registryStore, b.localScan), + serviceAccountStore: b.serviceAccountStore, + deploymentStore: b.deploymentStore, + nodeStore: b.nodeStore, + extSrcsStore: externalsrcs.StoreInstance(), + baselineEval: baseline.NewBaselineEvaluator(), + networkbaselineEval: networkBaselineEval.NewNetworkBaselineEvaluator(), + deduper: newDeduper(), + enforcer: b.enforcer, + + admCtrlSettingsMgr: b.admCtrlSettingsMgr, + auditLogUpdater: b.auditLogUpdater, + + detectorStopper: detectorStopper, + auditStopper: concurrency.NewStopper(), + serializerStopper: concurrency.NewStopper(), + alertStopSig: concurrency.NewSignal(), + + networkPolicyStore: b.networkPolicyStore, + + networkFlowsQueue: netFlowQueue, + indicatorsQueue: piQueue, + deploymentsQueue: deploymentQueue, + fileAccessQueue: fileAccessQueue, + }, nil +} diff --git a/sensor/common/detector/detector.go b/sensor/common/detector/detector.go index 9fd3cada9e72a..ce0bd4d53d485 100644 --- a/sensor/common/detector/detector.go +++ b/sensor/common/detector/detector.go @@ -15,15 +15,12 @@ import ( "github.com/stackrox/rox/pkg/booleanpolicy/networkpolicy" "github.com/stackrox/rox/pkg/centralsensor" "github.com/stackrox/rox/pkg/concurrency" - "github.com/stackrox/rox/pkg/env" "github.com/stackrox/rox/pkg/errorhelpers" "github.com/stackrox/rox/pkg/errox" "github.com/stackrox/rox/pkg/logging" "github.com/stackrox/rox/pkg/networkgraph" "github.com/stackrox/rox/pkg/networkgraph/networkbaseline" "github.com/stackrox/rox/pkg/protocompat" - "github.com/stackrox/rox/pkg/scopecomp" - queueScaler "github.com/stackrox/rox/pkg/sensor/queue" "github.com/stackrox/rox/pkg/sync" "github.com/stackrox/rox/sensor/common" "github.com/stackrox/rox/sensor/common/admissioncontroller" @@ -38,8 +35,6 @@ import ( "github.com/stackrox/rox/sensor/common/image/cache" "github.com/stackrox/rox/sensor/common/message" "github.com/stackrox/rox/sensor/common/metrics" - "github.com/stackrox/rox/sensor/common/registry" - "github.com/stackrox/rox/sensor/common/scan" "github.com/stackrox/rox/sensor/common/store" "github.com/stackrox/rox/sensor/common/updater" "google.golang.org/grpc" @@ -70,85 +65,6 @@ type Detector interface { ProcessFileAccess(ctx context.Context, access *storage.FileAccess) } -// New returns a new detector -// TODO(ROX-33799): Refactor to use builder pattern to reduce parameter count -func New(clusterID clusterIDPeekWaiter, enforcer enforcer.Enforcer, admCtrlSettingsMgr admissioncontroller.SettingsManager, - deploymentStore store.DeploymentStore, serviceAccountStore store.ServiceAccountStore, cache cache.Image, auditLogEvents chan *sensor.AuditEvents, - auditLogUpdater updater.Component, networkPolicyStore store.NetworkPolicyStore, registryStore *registry.Store, localScan *scan.LocalScan, nodeStore store.NodeStore, - clusterLabelProvider scopecomp.ClusterLabelProvider, namespaceLabelProvider scopecomp.NamespaceLabelProvider) Detector { - detectorStopper := concurrency.NewStopper() - netFlowQueueSize := queueScaler.ScaleSizeOnNonDefault(env.DetectorNetworkFlowBufferSize) - piQueueSize := queueScaler.ScaleSizeOnNonDefault(env.DetectorProcessIndicatorBufferSize) - fileAccessQueueSize := queueScaler.ScaleSizeOnNonDefault(env.DetectorFileAccessBufferSize) - deploymentQueueSize := 0 - if env.DetectorDeploymentBufferSize.IntegerSetting() > 0 { - deploymentQueueSize = queueScaler.ScaleSizeOnNonDefault(env.DetectorDeploymentBufferSize) - } - netFlowQueue := queue.NewQueue[*queue.FlowQueueItem]( - detectorStopper, - "FlowsQueue", - netFlowQueueSize, - detectorMetrics.DetectorNetworkFlowQueueOperations, - detectorMetrics.DetectorNetworkFlowDroppedCount, - ) - piQueue := queue.NewQueue[*queue.IndicatorQueueItem]( - detectorStopper, - "PIsQueue", - piQueueSize, - detectorMetrics.DetectorProcessIndicatorQueueOperations, - detectorMetrics.DetectorProcessIndicatorDroppedCount, - ) - // We only need the SimpleQueue since the deploymentQueue will not be paused/resumed - deploymentQueue := queue.NewSimpleQueue[*queue.DeploymentQueueItem]( - "DeploymentQueue", - deploymentQueueSize, - detectorMetrics.DetectorDeploymentQueueOperations, - detectorMetrics.DetectorDeploymentDroppedCount, - ) - - fileAccessQueue := queue.NewQueue[*queue.FileAccessQueueItem]( - detectorStopper, - "FileAccessQueue", - fileAccessQueueSize, - detectorMetrics.DetectorFileAccessQueueOperations, - detectorMetrics.DetectorFileAccessDroppedCount, - ) - - return &detectorImpl{ - unifiedDetector: unified.NewDetector(clusterLabelProvider, namespaceLabelProvider), - - output: make(chan *message.ExpiringMessage), - auditEventsChan: auditLogEvents, - deploymentAlertOutputChan: make(chan outputResult), - deploymentProcessingMap: make(map[string]int64), - - enricher: newEnricher(clusterID, cache, serviceAccountStore, registryStore, localScan), - serviceAccountStore: serviceAccountStore, - deploymentStore: deploymentStore, - nodeStore: nodeStore, - extSrcsStore: externalsrcs.StoreInstance(), - baselineEval: baseline.NewBaselineEvaluator(), - networkbaselineEval: networkBaselineEval.NewNetworkBaselineEvaluator(), - deduper: newDeduper(), - enforcer: enforcer, - - admCtrlSettingsMgr: admCtrlSettingsMgr, - auditLogUpdater: auditLogUpdater, - - detectorStopper: detectorStopper, - auditStopper: concurrency.NewStopper(), - serializerStopper: concurrency.NewStopper(), - alertStopSig: concurrency.NewSignal(), - - networkPolicyStore: networkPolicyStore, - - networkFlowsQueue: netFlowQueue, - indicatorsQueue: piQueue, - deploymentsQueue: deploymentQueue, - fileAccessQueue: fileAccessQueue, - } -} - type detectorImpl struct { unifiedDetector unified.Detector diff --git a/sensor/common/detector/enricher.go b/sensor/common/detector/enricher.go index 37f0e016a9a49..ceb30e7449e88 100644 --- a/sensor/common/detector/enricher.go +++ b/sensor/common/detector/enricher.go @@ -56,10 +56,11 @@ type enricher struct { imageCache cache.Image stopSig concurrency.Signal regStore *registry.Store - clusterID clusterIDPeekWaiter + clusterID ClusterIDProvider } -type clusterIDPeekWaiter interface { +// ClusterIDProvider provides cluster ID access with optional waiting. +type ClusterIDProvider interface { Get() string GetNoWait() string } @@ -260,7 +261,7 @@ func (c *cacheValue) updateImageNoLock(image *storage.Image) { c.image.Names = protoutils.SliceUnique(append(c.image.GetNames(), existingNames...)) } -func newEnricher(clusterID clusterIDPeekWaiter, cache cache.Image, serviceAccountStore store.ServiceAccountStore, registryStore *registry.Store, localScan *scan.LocalScan) *enricher { +func newEnricher(clusterID ClusterIDProvider, cache cache.Image, serviceAccountStore store.ServiceAccountStore, registryStore *registry.Store, localScan *scan.LocalScan) *enricher { return &enricher{ scanResultChan: make(chan scanResult), serviceAccountStore: serviceAccountStore, diff --git a/sensor/kubernetes/sensor/sensor.go b/sensor/kubernetes/sensor/sensor.go index 0dcde7766de08..59628c695cd7f 100644 --- a/sensor/kubernetes/sensor/sensor.go +++ b/sensor/kubernetes/sensor/sensor.go @@ -148,7 +148,26 @@ func CreateSensor(cfg *CreateOptions) (*sensor.Sensor, error) { pubSub := internalmessage.NewMessageSubscriber() - policyDetector := detector.New(clusterID, enforcer, admCtrlSettingsMgr, storeProvider.Deployments(), storeProvider.ServiceAccounts(), imageCache, auditLogEventsInput, auditLogCollectionManager, storeProvider.NetworkPolicies(), storeProvider.Registries(), localScan, storeProvider.Nodes(), storeProvider.ClusterLabels(), storeProvider.NamespaceLabels()) + policyDetector, err := detector.NewBuilder(). + WithClusterID(clusterID). + WithEnforcer(enforcer). + WithAdmCtrlSettingsMgr(admCtrlSettingsMgr). + WithDeploymentStore(storeProvider.Deployments()). + WithServiceAccountStore(storeProvider.ServiceAccounts()). + WithImageCache(imageCache). + WithAuditLogEvents(auditLogEventsInput). + WithAuditLogUpdater(auditLogCollectionManager). + WithNetworkPolicyStore(storeProvider.NetworkPolicies()). + WithRegistryStore(storeProvider.Registries()). + WithLocalScan(localScan). + WithNodeStore(storeProvider.Nodes()). + WithClusterLabelProvider(storeProvider.ClusterLabels()). + WithNamespaceLabelProvider(storeProvider.NamespaceLabels()). + Build() + if err != nil { + return nil, errors.Wrap(err, "creating detector") + } + reprocessorHandler := reprocessor.NewHandler(admCtrlSettingsMgr, policyDetector, imageCache) pipeline, err := eventpipeline.New(clusterID, cfg.k8sClient, configHandler, policyDetector, reprocessorHandler, k8sNodeName.Setting(), cfg.traceWriter, storeProvider, cfg.eventPipelineQueueSize, pubSub, internalMessageDispatcher) if err != nil {