From db192e6c92867e7084e306d1ac404f3a175a138d Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Tue, 24 Mar 2026 15:57:57 +0100 Subject: [PATCH] ROX-33758: Fix quadratic runtime in `endpointsStore.addToHistory` (#19566) (cherry picked from commit bfd67f21c5d10c414e3711e792bc4428e566f146) --- .../common/clusterentities/store_endpoints.go | 17 ++++- .../store_endpoints_benchmark_test.go | 73 +++++++++++++++++++ .../clusterentities/store_endpoints_test.go | 21 ++++++ 3 files changed, 107 insertions(+), 4 deletions(-) create mode 100644 sensor/common/clusterentities/store_endpoints_benchmark_test.go diff --git a/sensor/common/clusterentities/store_endpoints.go b/sensor/common/clusterentities/store_endpoints.go index 00ecf7e36aa9f..7ebfb1d614541 100644 --- a/sensor/common/clusterentities/store_endpoints.go +++ b/sensor/common/clusterentities/store_endpoints.go @@ -254,7 +254,18 @@ func (e *endpointsStore) deleteFromCurrent(deploymentID string, ep net.NumericEn } } -// addToHistory adds endpoint data to the history, but does not remove it from the current map +// addToHistory records history for one pair in linear time relative +// to the endpoint's target-info cardinality. +// +// Complexity: +// - O(T) for one call, where T = number of EndpointTargetInfo entries for this endpoint. +// - During purge of a deployment with M endpoints, total work scales as O(sum(T_i)) plus O(M) +// reverse-map updates, avoiding any M-by-M scan. +// +// This routine is performance-critical for large clusters with many nodes and NodePort/LoadBalancer +// service expansions, where endpoint cardinality can become very high. Its complexity directly +// impacts how long endpointsStore mutex is held during Apply(), and therefore affects Sensor +// throughput and event pipeline latency. func (e *endpointsStore) addToHistory(deploymentID string, ep net.NumericEndpoint) { // Prepare maps if empty if _, ok := e.historicalEndpoints[ep]; !ok { @@ -270,9 +281,7 @@ func (e *endpointsStore) addToHistory(deploymentID string, ep net.NumericEndpoin if _, ok := e.reverseHistoricalEndpoints[deploymentID]; !ok { e.reverseHistoricalEndpoints[deploymentID] = make(map[net.NumericEndpoint]*entityStatus) } - for numEp := range e.reverseEndpointMap[deploymentID] { - e.reverseHistoricalEndpoints[deploymentID][numEp] = newHistoricalEntity(e.memorySize) - } + e.reverseHistoricalEndpoints[deploymentID][ep] = newHistoricalEntity(e.memorySize) } func (e *endpointsStore) String() string { diff --git a/sensor/common/clusterentities/store_endpoints_benchmark_test.go b/sensor/common/clusterentities/store_endpoints_benchmark_test.go new file mode 100644 index 0000000000000..a5d1bad1fec39 --- /dev/null +++ b/sensor/common/clusterentities/store_endpoints_benchmark_test.go @@ -0,0 +1,73 @@ +package clusterentities + +import ( + "fmt" + "testing" + + "github.com/stackrox/rox/pkg/net" + "github.com/stackrox/rox/pkg/set" +) + +func benchmarkSeedEndpointsStore(numEndpoints int) (*endpointsStore, string, net.NumericEndpoint) { + const deploymentID = "depl-bench" + store := newEndpointsStoreWithMemory(5) + epSet := set.NewSet[net.NumericEndpoint]() + + var firstEndpoint net.NumericEndpoint + for i := range numEndpoints { + ep := buildEndpoint(fmt.Sprintf("10.%d.%d.%d", (i/65536)%256, (i/256)%256, i%256), 80) + if i == 0 { + firstEndpoint = ep + } + etiSet := set.NewSet[EndpointTargetInfo]() + etiSet.Add(EndpointTargetInfo{ + ContainerPort: 80, + PortName: "http", + }) + store.endpointMap[ep] = map[string]set.Set[EndpointTargetInfo]{deploymentID: etiSet} + epSet.Add(ep) + } + store.reverseEndpointMap[deploymentID] = epSet + + return store, deploymentID, firstEndpoint +} + +func BenchmarkEndpointsStoreAddToHistory(b *testing.B) { + for _, tc := range []struct { + name string + numEndpoints int + }{ + {name: "endpoints_100", numEndpoints: 100}, + {name: "endpoints_1000", numEndpoints: 1000}, + {name: "endpoints_5000", numEndpoints: 5000}, + } { + b.Run(tc.name, func(b *testing.B) { + store, deploymentID, endpoint := benchmarkSeedEndpointsStore(tc.numEndpoints) + b.ResetTimer() + for b.Loop() { + // Keep the current-map shape constant and measure addToHistory only. + store.historicalEndpoints = make(map[net.NumericEndpoint]map[string]map[EndpointTargetInfo]*entityStatus) + store.reverseHistoricalEndpoints = make(map[string]map[net.NumericEndpoint]*entityStatus) + store.addToHistory(deploymentID, endpoint) + } + }) + } +} + +// legacy is the version used in 4.10.0 and earlier (before backporting the fix). +/* +Running tool: /usr/local/go/bin/go test -test.fullpath=true -benchmem -run=^$ -bench ^BenchmarkEndpointsStoreAddToHistory$ github.com/stackrox/rox/sensor/common/clusterentities -count=1 + +goos: darwin +goarch: arm64 +pkg: github.com/stackrox/rox/sensor/common/clusterentities +cpu: Apple M3 Pro +BenchmarkEndpointsStoreAddToHistory/endpoints_100-12 1656640 695.7 ns/op 2100 B/op 12 allocs/op +BenchmarkEndpointsStoreAddToHistory/legacy_endpoints_100-12 129315 9212 ns/op 20226 B/op 120 allocs/op +BenchmarkEndpointsStoreAddToHistory/endpoints_1000-12 1849548 650.1 ns/op 2100 B/op 12 allocs/op +BenchmarkEndpointsStoreAddToHistory/legacy_endpoints_1000-12 10000 115167 ns/op 302780 B/op 1031 allocs/op +BenchmarkEndpointsStoreAddToHistory/endpoints_5000-12 1718923 703.6 ns/op 2100 B/op 12 allocs/op +BenchmarkEndpointsStoreAddToHistory/legacy_endpoints_5000-12 2221 535328 ns/op 1195998 B/op 5057 allocs/op +PASS +ok github.com/stackrox/rox/sensor/common/clusterentities 7.895s +*/ diff --git a/sensor/common/clusterentities/store_endpoints_test.go b/sensor/common/clusterentities/store_endpoints_test.go index dcf43fe36308f..00414ce80e621 100644 --- a/sensor/common/clusterentities/store_endpoints_test.go +++ b/sensor/common/clusterentities/store_endpoints_test.go @@ -327,3 +327,24 @@ func buildExpectation(ip, deplID, portName string, location whereThingIsStored, }, } } + +func (s *ClusterEntitiesStoreTestSuite) TestEndpointTakeoverFastPathDoesNotPolluteReverseHistory() { + store := NewStore(5, nil, true) + + ep1 := buildEndpoint("10.0.0.1", 80) + ep2 := buildEndpoint("10.0.0.2", 80) + + deplA := &EntityData{} + deplA.AddEndpoint(ep1, EndpointTargetInfo{ContainerPort: 80, PortName: "http"}) + deplA.AddEndpoint(ep2, EndpointTargetInfo{ContainerPort: 80, PortName: "http"}) + store.Apply(map[string]*EntityData{"deplA": deplA}, true) + + deplB := &EntityData{} + deplB.AddEndpoint(ep1, EndpointTargetInfo{ContainerPort: 80, PortName: "http"}) + store.Apply(map[string]*EntityData{"deplB": deplB}, true) + + reverseHistDeplA, ok := store.endpointsStore.reverseHistoricalEndpoints["deplA"] + s.True(ok, "deplA should have history entry after endpoint takeover") + s.Contains(reverseHistDeplA, ep1, "taken-over endpoint must be historical for previous owner") + s.NotContains(reverseHistDeplA, ep2, "unrelated endpoint must not be marked historical during single-endpoint takeover") +}