Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions sensor/common/clusterentities/store_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,18 @@ func (e *endpointsStore) deleteFromCurrent(deploymentID string, ep net.NumericEn
}
}

// addToHistory adds endpoint data to the history, but does not remove it from the current map
// addToHistory records history for one <deployment, endpoint> pair in linear time relative
// to the endpoint's target-info cardinality.
//
// Complexity:
// - O(T) for one call, where T = number of EndpointTargetInfo entries for this endpoint.
// - During purge of a deployment with M endpoints, total work scales as O(sum(T_i)) plus O(M)
// reverse-map updates, avoiding any M-by-M scan.
//
// This routine is performance-critical for large clusters with many nodes and NodePort/LoadBalancer
// service expansions, where endpoint cardinality can become very high. Its complexity directly
// impacts how long endpointsStore mutex is held during Apply(), and therefore affects Sensor
// throughput and event pipeline latency.
func (e *endpointsStore) addToHistory(deploymentID string, ep net.NumericEndpoint) {
// Prepare maps if empty
if _, ok := e.historicalEndpoints[ep]; !ok {
Expand All @@ -270,9 +281,7 @@ func (e *endpointsStore) addToHistory(deploymentID string, ep net.NumericEndpoin
if _, ok := e.reverseHistoricalEndpoints[deploymentID]; !ok {
e.reverseHistoricalEndpoints[deploymentID] = make(map[net.NumericEndpoint]*entityStatus)
}
for numEp := range e.reverseEndpointMap[deploymentID] {
e.reverseHistoricalEndpoints[deploymentID][numEp] = newHistoricalEntity(e.memorySize)
}
e.reverseHistoricalEndpoints[deploymentID][ep] = newHistoricalEntity(e.memorySize)
}

func (e *endpointsStore) String() string {
Expand Down
73 changes: 73 additions & 0 deletions sensor/common/clusterentities/store_endpoints_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package clusterentities

import (
"fmt"
"testing"

"github.com/stackrox/rox/pkg/net"
"github.com/stackrox/rox/pkg/set"
)

func benchmarkSeedEndpointsStore(numEndpoints int) (*endpointsStore, string, net.NumericEndpoint) {
const deploymentID = "depl-bench"
store := newEndpointsStoreWithMemory(5)
epSet := set.NewSet[net.NumericEndpoint]()

var firstEndpoint net.NumericEndpoint
for i := range numEndpoints {
ep := buildEndpoint(fmt.Sprintf("10.%d.%d.%d", (i/65536)%256, (i/256)%256, i%256), 80)
if i == 0 {
firstEndpoint = ep
}
etiSet := set.NewSet[EndpointTargetInfo]()
etiSet.Add(EndpointTargetInfo{
ContainerPort: 80,
PortName: "http",
})
store.endpointMap[ep] = map[string]set.Set[EndpointTargetInfo]{deploymentID: etiSet}
epSet.Add(ep)
}
store.reverseEndpointMap[deploymentID] = epSet

return store, deploymentID, firstEndpoint
}

func BenchmarkEndpointsStoreAddToHistory(b *testing.B) {
for _, tc := range []struct {
name string
numEndpoints int
}{
{name: "endpoints_100", numEndpoints: 100},
{name: "endpoints_1000", numEndpoints: 1000},
{name: "endpoints_5000", numEndpoints: 5000},
} {
b.Run(tc.name, func(b *testing.B) {
store, deploymentID, endpoint := benchmarkSeedEndpointsStore(tc.numEndpoints)
b.ResetTimer()
for b.Loop() {
// Keep the current-map shape constant and measure addToHistory only.
store.historicalEndpoints = make(map[net.NumericEndpoint]map[string]map[EndpointTargetInfo]*entityStatus)
store.reverseHistoricalEndpoints = make(map[string]map[net.NumericEndpoint]*entityStatus)
store.addToHistory(deploymentID, endpoint)
}
})
}
}

// legacy is the version used in 4.10.0 and earlier (before backporting the fix).
/*
Running tool: /usr/local/go/bin/go test -test.fullpath=true -benchmem -run=^$ -bench ^BenchmarkEndpointsStoreAddToHistory$ github.com/stackrox/rox/sensor/common/clusterentities -count=1

goos: darwin
goarch: arm64
pkg: github.com/stackrox/rox/sensor/common/clusterentities
cpu: Apple M3 Pro
BenchmarkEndpointsStoreAddToHistory/endpoints_100-12 1656640 695.7 ns/op 2100 B/op 12 allocs/op
BenchmarkEndpointsStoreAddToHistory/legacy_endpoints_100-12 129315 9212 ns/op 20226 B/op 120 allocs/op
BenchmarkEndpointsStoreAddToHistory/endpoints_1000-12 1849548 650.1 ns/op 2100 B/op 12 allocs/op
BenchmarkEndpointsStoreAddToHistory/legacy_endpoints_1000-12 10000 115167 ns/op 302780 B/op 1031 allocs/op
BenchmarkEndpointsStoreAddToHistory/endpoints_5000-12 1718923 703.6 ns/op 2100 B/op 12 allocs/op
BenchmarkEndpointsStoreAddToHistory/legacy_endpoints_5000-12 2221 535328 ns/op 1195998 B/op 5057 allocs/op
PASS
ok github.com/stackrox/rox/sensor/common/clusterentities 7.895s
*/
21 changes: 21 additions & 0 deletions sensor/common/clusterentities/store_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,24 @@ func buildExpectation(ip, deplID, portName string, location whereThingIsStored,
},
}
}

func (s *ClusterEntitiesStoreTestSuite) TestEndpointTakeoverFastPathDoesNotPolluteReverseHistory() {
store := NewStore(5, nil, true)

ep1 := buildEndpoint("10.0.0.1", 80)
ep2 := buildEndpoint("10.0.0.2", 80)

deplA := &EntityData{}
deplA.AddEndpoint(ep1, EndpointTargetInfo{ContainerPort: 80, PortName: "http"})
deplA.AddEndpoint(ep2, EndpointTargetInfo{ContainerPort: 80, PortName: "http"})
store.Apply(map[string]*EntityData{"deplA": deplA}, true)

deplB := &EntityData{}
deplB.AddEndpoint(ep1, EndpointTargetInfo{ContainerPort: 80, PortName: "http"})
store.Apply(map[string]*EntityData{"deplB": deplB}, true)

reverseHistDeplA, ok := store.endpointsStore.reverseHistoricalEndpoints["deplA"]
s.True(ok, "deplA should have history entry after endpoint takeover")
s.Contains(reverseHistDeplA, ep1, "taken-over endpoint must be historical for previous owner")
s.NotContains(reverseHistDeplA, ep2, "unrelated endpoint must not be marked historical during single-endpoint takeover")
}
Loading