diff --git a/central/detection/lifecycle/manager_impl.go b/central/detection/lifecycle/manager_impl.go index 4381222e4f78c..ab97e249add96 100644 --- a/central/detection/lifecycle/manager_impl.go +++ b/central/detection/lifecycle/manager_impl.go @@ -110,21 +110,20 @@ func (m *managerImpl) buildIndicatorFilter() { return } - var processesToRemove []string + processesToRemove := make([]string, 0, len(deploymentIDs)) walkFn := func() error { - deploymentIDSet := set.NewStringSet(deploymentIDs...) processesToRemove = processesToRemove[:0] - return m.processesDataStore.WalkAll(ctx, func(pi *storage.ProcessIndicator) error { - if !deploymentIDSet.Contains(pi.GetDeploymentId()) { - // Don't remove as these processes will be removed by GC - // but don't add to the filter - return nil - } + + // Only process indicators for existing deployments + fn := func(pi *storage.ProcessIndicator) error { if !m.processFilter.Add(pi) { processesToRemove = append(processesToRemove, pi.GetId()) } return nil - }) + } + + query := search.NewQueryBuilder().AddExactMatches(search.DeploymentID, deploymentIDs...).ProtoQuery() + return m.processesDataStore.WalkByQuery(ctx, query, fn) } if err := pgutils.RetryIfPostgres(ctx, walkFn); err != nil { utils.Should(errors.Wrap(err, "error building indicator filter")) diff --git a/central/pod/datastore/datastore_impl_real_test.go b/central/pod/datastore/datastore_impl_real_test.go index 101424dbd2ece..5c626958361de 100644 --- a/central/pod/datastore/datastore_impl_real_test.go +++ b/central/pod/datastore/datastore_impl_real_test.go @@ -72,7 +72,7 @@ func (s *PodDatastoreSuite) SetupTest() { func (s *PodDatastoreSuite) getProcessIndicatorsFromDB() []*storage.ProcessIndicator { indicatorsFromDB := []*storage.ProcessIndicator{} - err := s.indicatorDataStore.WalkAll(s.plopAndPiCtx, + err := s.indicatorDataStore.WalkByQuery(s.plopAndPiCtx, nil, func(processIndicator *storage.ProcessIndicator) error { indicatorsFromDB = append(indicatorsFromDB, processIndicator) return nil diff --git a/central/processindicator/datastore/datastore.go b/central/processindicator/datastore/datastore.go index 3a42f5df41948..516cb08758030 100644 --- a/central/processindicator/datastore/datastore.go +++ b/central/processindicator/datastore/datastore.go @@ -35,7 +35,7 @@ type DataStore interface { RemoveProcessIndicators(ctx context.Context, ids []string) error PruneProcessIndicators(ctx context.Context, ids []string) (int, error) - WalkAll(ctx context.Context, fn func(pi *storage.ProcessIndicator) error) error + WalkByQuery(ctx context.Context, query *v1.Query, fn func(obj *storage.ProcessIndicator) error) error // IterateOverProcessIndicatorsRiskView iterates over minimal fields from process indicator for risk evaluation IterateOverProcessIndicatorsRiskView(ctx context.Context, q *v1.Query, fn func(*views.ProcessIndicatorRiskView) error) error diff --git a/central/processindicator/datastore/datastore_impl.go b/central/processindicator/datastore/datastore_impl.go index 50e7490ab1938..051860524aec5 100644 --- a/central/processindicator/datastore/datastore_impl.go +++ b/central/processindicator/datastore/datastore_impl.go @@ -116,14 +116,14 @@ func (ds *datastoreImpl) AddProcessIndicators(ctx context.Context, indicators .. return nil } -func (ds *datastoreImpl) WalkAll(ctx context.Context, fn func(pi *storage.ProcessIndicator) error) error { +func (ds *datastoreImpl) WalkByQuery(ctx context.Context, q *v1.Query, fn func(pi *storage.ProcessIndicator) error) error { if ok, err := deploymentExtensionSAC.ReadAllowed(ctx); err != nil { return err } else if !ok { return sac.ErrResourceAccessDenied } - return ds.storage.Walk(ctx, fn) + return ds.storage.WalkByQuery(ctx, q, fn) } func (ds *datastoreImpl) RemoveProcessIndicators(ctx context.Context, ids []string) error { diff --git a/central/processindicator/datastore/mocks/datastore.go b/central/processindicator/datastore/mocks/datastore.go index 54a51db3d8535..55b8c9f491b6f 100644 --- a/central/processindicator/datastore/mocks/datastore.go +++ b/central/processindicator/datastore/mocks/datastore.go @@ -224,16 +224,16 @@ func (mr *MockDataStoreMockRecorder) Wait(cancelWhen any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Wait", reflect.TypeOf((*MockDataStore)(nil).Wait), cancelWhen) } -// WalkAll mocks base method. -func (m *MockDataStore) WalkAll(ctx context.Context, fn func(*storage.ProcessIndicator) error) error { +// WalkByQuery mocks base method. +func (m *MockDataStore) WalkByQuery(ctx context.Context, query *v1.Query, fn func(*storage.ProcessIndicator) error) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WalkAll", ctx, fn) + ret := m.ctrl.Call(m, "WalkByQuery", ctx, query, fn) ret0, _ := ret[0].(error) return ret0 } -// WalkAll indicates an expected call of WalkAll. -func (mr *MockDataStoreMockRecorder) WalkAll(ctx, fn any) *gomock.Call { +// WalkByQuery indicates an expected call of WalkByQuery. +func (mr *MockDataStoreMockRecorder) WalkByQuery(ctx, query, fn any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WalkAll", reflect.TypeOf((*MockDataStore)(nil).WalkAll), ctx, fn) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WalkByQuery", reflect.TypeOf((*MockDataStore)(nil).WalkByQuery), ctx, query, fn) } diff --git a/central/processindicator/store/mocks/store.go b/central/processindicator/store/mocks/store.go index 4a93fcdd2e8d4..17d0894672c04 100644 --- a/central/processindicator/store/mocks/store.go +++ b/central/processindicator/store/mocks/store.go @@ -175,3 +175,17 @@ func (mr *MockStoreMockRecorder) Walk(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Walk", reflect.TypeOf((*MockStore)(nil).Walk), arg0, arg1) } + +// WalkByQuery mocks base method. +func (m *MockStore) WalkByQuery(arg0 context.Context, arg1 *v1.Query, arg2 func(*storage.ProcessIndicator) error) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WalkByQuery", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// WalkByQuery indicates an expected call of WalkByQuery. +func (mr *MockStoreMockRecorder) WalkByQuery(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WalkByQuery", reflect.TypeOf((*MockStore)(nil).WalkByQuery), arg0, arg1, arg2) +} diff --git a/central/processindicator/store/store.go b/central/processindicator/store/store.go index e84ad6597e44f..17ef4d6bbfef2 100644 --- a/central/processindicator/store/store.go +++ b/central/processindicator/store/store.go @@ -23,5 +23,6 @@ type Store interface { DeleteMany(ctx context.Context, id []string) error Walk(context.Context, func(pi *storage.ProcessIndicator) error) error + WalkByQuery(context.Context, *v1.Query, func(pi *storage.ProcessIndicator) error) error DeleteByQuery(ctx context.Context, query *v1.Query) error } diff --git a/central/processlisteningonport/datastore/datastore_impl_test.go b/central/processlisteningonport/datastore/datastore_impl_test.go index 4e9bc514bdc5e..063e564f4ef9c 100644 --- a/central/processlisteningonport/datastore/datastore_impl_test.go +++ b/central/processlisteningonport/datastore/datastore_impl_test.go @@ -91,7 +91,7 @@ func (suite *PLOPDataStoreTestSuite) getPlopsFromDB() []*storage.ProcessListenin func (suite *PLOPDataStoreTestSuite) getProcessIndicatorsFromDB() []*storage.ProcessIndicator { indicatorsFromDB := []*storage.ProcessIndicator{} - err := suite.indicatorDataStore.WalkAll(suite.hasWriteCtx, + err := suite.indicatorDataStore.WalkByQuery(suite.hasWriteCtx, nil, func(processIndicator *storage.ProcessIndicator) error { indicatorsFromDB = append(indicatorsFromDB, processIndicator) return nil