diff --git a/sensor/common/detector/builder.go b/sensor/common/detector/builder.go new file mode 100644 index 0000000000000..8b75cf259e2e2 --- /dev/null +++ b/sensor/common/detector/builder.go @@ -0,0 +1,223 @@ +package detector + +import ( + "github.com/pkg/errors" + "github.com/stackrox/rox/generated/internalapi/sensor" + "github.com/stackrox/rox/pkg/concurrency" + "github.com/stackrox/rox/pkg/env" + "github.com/stackrox/rox/pkg/scopecomp" + queueScaler "github.com/stackrox/rox/pkg/sensor/queue" + "github.com/stackrox/rox/sensor/common/admissioncontroller" + "github.com/stackrox/rox/sensor/common/detector/baseline" + detectorMetrics "github.com/stackrox/rox/sensor/common/detector/metrics" + networkBaselineEval "github.com/stackrox/rox/sensor/common/detector/networkbaseline" + "github.com/stackrox/rox/sensor/common/detector/queue" + "github.com/stackrox/rox/sensor/common/detector/unified" + "github.com/stackrox/rox/sensor/common/enforcer" + "github.com/stackrox/rox/sensor/common/externalsrcs" + "github.com/stackrox/rox/sensor/common/image/cache" + "github.com/stackrox/rox/sensor/common/message" + "github.com/stackrox/rox/sensor/common/registry" + "github.com/stackrox/rox/sensor/common/scan" + "github.com/stackrox/rox/sensor/common/store" + "github.com/stackrox/rox/sensor/common/updater" +) + +// Builder constructs a Detector with a fluent API. +type Builder struct { + clusterID ClusterIDProvider + enforcer enforcer.Enforcer + admCtrlSettingsMgr admissioncontroller.SettingsManager + deploymentStore store.DeploymentStore + serviceAccountStore store.ServiceAccountStore + imageCache cache.Image + auditLogEvents chan *sensor.AuditEvents + auditLogUpdater updater.Component + networkPolicyStore store.NetworkPolicyStore + registryStore *registry.Store + localScan *scan.LocalScan + nodeStore store.NodeStore + clusterLabelProvider scopecomp.ClusterLabelProvider + namespaceLabelProvider scopecomp.NamespaceLabelProvider +} + +// NewBuilder returns a new Builder for constructing a Detector. +func NewBuilder() *Builder { + return &Builder{} +} + +func (b *Builder) WithClusterID(id ClusterIDProvider) *Builder { + b.clusterID = id + return b +} + +func (b *Builder) WithEnforcer(e enforcer.Enforcer) *Builder { + b.enforcer = e + return b +} + +func (b *Builder) WithAdmCtrlSettingsMgr(mgr admissioncontroller.SettingsManager) *Builder { + b.admCtrlSettingsMgr = mgr + return b +} + +func (b *Builder) WithDeploymentStore(ds store.DeploymentStore) *Builder { + b.deploymentStore = ds + return b +} + +func (b *Builder) WithServiceAccountStore(sas store.ServiceAccountStore) *Builder { + b.serviceAccountStore = sas + return b +} + +func (b *Builder) WithImageCache(c cache.Image) *Builder { + b.imageCache = c + return b +} + +func (b *Builder) WithAuditLogEvents(events chan *sensor.AuditEvents) *Builder { + b.auditLogEvents = events + return b +} + +func (b *Builder) WithAuditLogUpdater(u updater.Component) *Builder { + b.auditLogUpdater = u + return b +} + +func (b *Builder) WithNetworkPolicyStore(nps store.NetworkPolicyStore) *Builder { + b.networkPolicyStore = nps + return b +} + +func (b *Builder) WithRegistryStore(rs *registry.Store) *Builder { + b.registryStore = rs + return b +} + +func (b *Builder) WithLocalScan(ls *scan.LocalScan) *Builder { + b.localScan = ls + return b +} + +func (b *Builder) WithNodeStore(ns store.NodeStore) *Builder { + b.nodeStore = ns + return b +} + +func (b *Builder) WithClusterLabelProvider(clp scopecomp.ClusterLabelProvider) *Builder { + b.clusterLabelProvider = clp + return b +} + +func (b *Builder) WithNamespaceLabelProvider(nlp scopecomp.NamespaceLabelProvider) *Builder { + b.namespaceLabelProvider = nlp + return b +} + +func (b *Builder) validate() error { + type field struct { + name string + value any + } + for _, f := range []field{ + {"ClusterID", b.clusterID}, + {"Enforcer", b.enforcer}, + {"DeploymentStore", b.deploymentStore}, + {"ServiceAccountStore", b.serviceAccountStore}, + {"ImageCache", b.imageCache}, + {"AuditLogEvents", b.auditLogEvents}, + {"AuditLogUpdater", b.auditLogUpdater}, + {"NetworkPolicyStore", b.networkPolicyStore}, + {"RegistryStore", b.registryStore}, + {"LocalScan", b.localScan}, + {"NodeStore", b.nodeStore}, + {"ClusterLabelProvider", b.clusterLabelProvider}, + {"NamespaceLabelProvider", b.namespaceLabelProvider}, + } { + if f.value == nil { + return errors.Errorf("detector.Builder: %s is required", f.name) + } + } + return nil +} + +// Build creates a new Detector from the builder configuration. +func (b *Builder) Build() (Detector, error) { + if err := b.validate(); err != nil { + return nil, err + } + + detectorStopper := concurrency.NewStopper() + netFlowQueueSize := queueScaler.ScaleSizeOnNonDefault(env.DetectorNetworkFlowBufferSize) + piQueueSize := queueScaler.ScaleSizeOnNonDefault(env.DetectorProcessIndicatorBufferSize) + fileAccessQueueSize := queueScaler.ScaleSizeOnNonDefault(env.DetectorFileAccessBufferSize) + deploymentQueueSize := 0 + if env.DetectorDeploymentBufferSize.IntegerSetting() > 0 { + deploymentQueueSize = queueScaler.ScaleSizeOnNonDefault(env.DetectorDeploymentBufferSize) + } + netFlowQueue := queue.NewQueue[*queue.FlowQueueItem]( + detectorStopper, + "FlowsQueue", + netFlowQueueSize, + detectorMetrics.DetectorNetworkFlowQueueOperations, + detectorMetrics.DetectorNetworkFlowDroppedCount, + ) + piQueue := queue.NewQueue[*queue.IndicatorQueueItem]( + detectorStopper, + "PIsQueue", + piQueueSize, + detectorMetrics.DetectorProcessIndicatorQueueOperations, + detectorMetrics.DetectorProcessIndicatorDroppedCount, + ) + // We only need the SimpleQueue since the deploymentQueue will not be paused/resumed + deploymentQueue := queue.NewSimpleQueue[*queue.DeploymentQueueItem]( + "DeploymentQueue", + deploymentQueueSize, + detectorMetrics.DetectorDeploymentQueueOperations, + detectorMetrics.DetectorDeploymentDroppedCount, + ) + + fileAccessQueue := queue.NewQueue[*queue.FileAccessQueueItem]( + detectorStopper, + "FileAccessQueue", + fileAccessQueueSize, + detectorMetrics.DetectorFileAccessQueueOperations, + detectorMetrics.DetectorFileAccessDroppedCount, + ) + + return &detectorImpl{ + unifiedDetector: unified.NewDetector(b.clusterLabelProvider, b.namespaceLabelProvider), + + output: make(chan *message.ExpiringMessage), + auditEventsChan: b.auditLogEvents, + deploymentAlertOutputChan: make(chan outputResult), + deploymentProcessingMap: make(map[string]int64), + + enricher: newEnricher(b.clusterID, b.imageCache, b.serviceAccountStore, b.registryStore, b.localScan), + serviceAccountStore: b.serviceAccountStore, + deploymentStore: b.deploymentStore, + nodeStore: b.nodeStore, + extSrcsStore: externalsrcs.StoreInstance(), + baselineEval: baseline.NewBaselineEvaluator(), + networkbaselineEval: networkBaselineEval.NewNetworkBaselineEvaluator(), + deduper: newDeduper(), + enforcer: b.enforcer, + + admCtrlSettingsMgr: b.admCtrlSettingsMgr, + auditLogUpdater: b.auditLogUpdater, + + detectorStopper: detectorStopper, + auditStopper: concurrency.NewStopper(), + serializerStopper: concurrency.NewStopper(), + alertStopSig: concurrency.NewSignal(), + + networkPolicyStore: b.networkPolicyStore, + + networkFlowsQueue: netFlowQueue, + indicatorsQueue: piQueue, + deploymentsQueue: deploymentQueue, + fileAccessQueue: fileAccessQueue, + }, nil +} diff --git a/sensor/common/detector/detector.go b/sensor/common/detector/detector.go index 9fd3cada9e72a..ce0bd4d53d485 100644 --- a/sensor/common/detector/detector.go +++ b/sensor/common/detector/detector.go @@ -15,15 +15,12 @@ import ( "github.com/stackrox/rox/pkg/booleanpolicy/networkpolicy" "github.com/stackrox/rox/pkg/centralsensor" "github.com/stackrox/rox/pkg/concurrency" - "github.com/stackrox/rox/pkg/env" "github.com/stackrox/rox/pkg/errorhelpers" "github.com/stackrox/rox/pkg/errox" "github.com/stackrox/rox/pkg/logging" "github.com/stackrox/rox/pkg/networkgraph" "github.com/stackrox/rox/pkg/networkgraph/networkbaseline" "github.com/stackrox/rox/pkg/protocompat" - "github.com/stackrox/rox/pkg/scopecomp" - queueScaler "github.com/stackrox/rox/pkg/sensor/queue" "github.com/stackrox/rox/pkg/sync" "github.com/stackrox/rox/sensor/common" "github.com/stackrox/rox/sensor/common/admissioncontroller" @@ -38,8 +35,6 @@ import ( "github.com/stackrox/rox/sensor/common/image/cache" "github.com/stackrox/rox/sensor/common/message" "github.com/stackrox/rox/sensor/common/metrics" - "github.com/stackrox/rox/sensor/common/registry" - "github.com/stackrox/rox/sensor/common/scan" "github.com/stackrox/rox/sensor/common/store" "github.com/stackrox/rox/sensor/common/updater" "google.golang.org/grpc" @@ -70,85 +65,6 @@ type Detector interface { ProcessFileAccess(ctx context.Context, access *storage.FileAccess) } -// New returns a new detector -// TODO(ROX-33799): Refactor to use builder pattern to reduce parameter count -func New(clusterID clusterIDPeekWaiter, enforcer enforcer.Enforcer, admCtrlSettingsMgr admissioncontroller.SettingsManager, - deploymentStore store.DeploymentStore, serviceAccountStore store.ServiceAccountStore, cache cache.Image, auditLogEvents chan *sensor.AuditEvents, - auditLogUpdater updater.Component, networkPolicyStore store.NetworkPolicyStore, registryStore *registry.Store, localScan *scan.LocalScan, nodeStore store.NodeStore, - clusterLabelProvider scopecomp.ClusterLabelProvider, namespaceLabelProvider scopecomp.NamespaceLabelProvider) Detector { - detectorStopper := concurrency.NewStopper() - netFlowQueueSize := queueScaler.ScaleSizeOnNonDefault(env.DetectorNetworkFlowBufferSize) - piQueueSize := queueScaler.ScaleSizeOnNonDefault(env.DetectorProcessIndicatorBufferSize) - fileAccessQueueSize := queueScaler.ScaleSizeOnNonDefault(env.DetectorFileAccessBufferSize) - deploymentQueueSize := 0 - if env.DetectorDeploymentBufferSize.IntegerSetting() > 0 { - deploymentQueueSize = queueScaler.ScaleSizeOnNonDefault(env.DetectorDeploymentBufferSize) - } - netFlowQueue := queue.NewQueue[*queue.FlowQueueItem]( - detectorStopper, - "FlowsQueue", - netFlowQueueSize, - detectorMetrics.DetectorNetworkFlowQueueOperations, - detectorMetrics.DetectorNetworkFlowDroppedCount, - ) - piQueue := queue.NewQueue[*queue.IndicatorQueueItem]( - detectorStopper, - "PIsQueue", - piQueueSize, - detectorMetrics.DetectorProcessIndicatorQueueOperations, - detectorMetrics.DetectorProcessIndicatorDroppedCount, - ) - // We only need the SimpleQueue since the deploymentQueue will not be paused/resumed - deploymentQueue := queue.NewSimpleQueue[*queue.DeploymentQueueItem]( - "DeploymentQueue", - deploymentQueueSize, - detectorMetrics.DetectorDeploymentQueueOperations, - detectorMetrics.DetectorDeploymentDroppedCount, - ) - - fileAccessQueue := queue.NewQueue[*queue.FileAccessQueueItem]( - detectorStopper, - "FileAccessQueue", - fileAccessQueueSize, - detectorMetrics.DetectorFileAccessQueueOperations, - detectorMetrics.DetectorFileAccessDroppedCount, - ) - - return &detectorImpl{ - unifiedDetector: unified.NewDetector(clusterLabelProvider, namespaceLabelProvider), - - output: make(chan *message.ExpiringMessage), - auditEventsChan: auditLogEvents, - deploymentAlertOutputChan: make(chan outputResult), - deploymentProcessingMap: make(map[string]int64), - - enricher: newEnricher(clusterID, cache, serviceAccountStore, registryStore, localScan), - serviceAccountStore: serviceAccountStore, - deploymentStore: deploymentStore, - nodeStore: nodeStore, - extSrcsStore: externalsrcs.StoreInstance(), - baselineEval: baseline.NewBaselineEvaluator(), - networkbaselineEval: networkBaselineEval.NewNetworkBaselineEvaluator(), - deduper: newDeduper(), - enforcer: enforcer, - - admCtrlSettingsMgr: admCtrlSettingsMgr, - auditLogUpdater: auditLogUpdater, - - detectorStopper: detectorStopper, - auditStopper: concurrency.NewStopper(), - serializerStopper: concurrency.NewStopper(), - alertStopSig: concurrency.NewSignal(), - - networkPolicyStore: networkPolicyStore, - - networkFlowsQueue: netFlowQueue, - indicatorsQueue: piQueue, - deploymentsQueue: deploymentQueue, - fileAccessQueue: fileAccessQueue, - } -} - type detectorImpl struct { unifiedDetector unified.Detector diff --git a/sensor/common/detector/enricher.go b/sensor/common/detector/enricher.go index 37f0e016a9a49..ceb30e7449e88 100644 --- a/sensor/common/detector/enricher.go +++ b/sensor/common/detector/enricher.go @@ -56,10 +56,11 @@ type enricher struct { imageCache cache.Image stopSig concurrency.Signal regStore *registry.Store - clusterID clusterIDPeekWaiter + clusterID ClusterIDProvider } -type clusterIDPeekWaiter interface { +// ClusterIDProvider provides cluster ID access with optional waiting. +type ClusterIDProvider interface { Get() string GetNoWait() string } @@ -260,7 +261,7 @@ func (c *cacheValue) updateImageNoLock(image *storage.Image) { c.image.Names = protoutils.SliceUnique(append(c.image.GetNames(), existingNames...)) } -func newEnricher(clusterID clusterIDPeekWaiter, cache cache.Image, serviceAccountStore store.ServiceAccountStore, registryStore *registry.Store, localScan *scan.LocalScan) *enricher { +func newEnricher(clusterID ClusterIDProvider, cache cache.Image, serviceAccountStore store.ServiceAccountStore, registryStore *registry.Store, localScan *scan.LocalScan) *enricher { return &enricher{ scanResultChan: make(chan scanResult), serviceAccountStore: serviceAccountStore, diff --git a/sensor/kubernetes/sensor/sensor.go b/sensor/kubernetes/sensor/sensor.go index 0dcde7766de08..59628c695cd7f 100644 --- a/sensor/kubernetes/sensor/sensor.go +++ b/sensor/kubernetes/sensor/sensor.go @@ -148,7 +148,26 @@ func CreateSensor(cfg *CreateOptions) (*sensor.Sensor, error) { pubSub := internalmessage.NewMessageSubscriber() - policyDetector := detector.New(clusterID, enforcer, admCtrlSettingsMgr, storeProvider.Deployments(), storeProvider.ServiceAccounts(), imageCache, auditLogEventsInput, auditLogCollectionManager, storeProvider.NetworkPolicies(), storeProvider.Registries(), localScan, storeProvider.Nodes(), storeProvider.ClusterLabels(), storeProvider.NamespaceLabels()) + policyDetector, err := detector.NewBuilder(). + WithClusterID(clusterID). + WithEnforcer(enforcer). + WithAdmCtrlSettingsMgr(admCtrlSettingsMgr). + WithDeploymentStore(storeProvider.Deployments()). + WithServiceAccountStore(storeProvider.ServiceAccounts()). + WithImageCache(imageCache). + WithAuditLogEvents(auditLogEventsInput). + WithAuditLogUpdater(auditLogCollectionManager). + WithNetworkPolicyStore(storeProvider.NetworkPolicies()). + WithRegistryStore(storeProvider.Registries()). + WithLocalScan(localScan). + WithNodeStore(storeProvider.Nodes()). + WithClusterLabelProvider(storeProvider.ClusterLabels()). + WithNamespaceLabelProvider(storeProvider.NamespaceLabels()). + Build() + if err != nil { + return nil, errors.Wrap(err, "creating detector") + } + reprocessorHandler := reprocessor.NewHandler(admCtrlSettingsMgr, policyDetector, imageCache) pipeline, err := eventpipeline.New(clusterID, cfg.k8sClient, configHandler, policyDetector, reprocessorHandler, k8sNodeName.Setting(), cfg.traceWriter, storeProvider, cfg.eventPipelineQueueSize, pubSub, internalMessageDispatcher) if err != nil {