ROX-32316: Add central rate limiter for VM index reports#18220
ROX-32316: Add central rate limiter for VM index reports#18220
Conversation
|
This change is part of the following stack: Change managed by git-spice. |
|
Skipping CI for Draft Pull Request. |
|
Images are ready for the commit at 149a807. To use with deploy scripts, first |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18220 +/- ##
==========================================
+ Coverage 49.06% 49.13% +0.07%
==========================================
Files 2650 2653 +3
Lines 198921 199265 +344
==========================================
+ Hits 97592 97904 +312
- Misses 93896 93918 +22
- Partials 7433 7443 +10
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
225c6d4 to
92246b0
Compare
cdefaba to
1bbae43
Compare
92246b0 to
54faf82
Compare
1bbae43 to
8f2bc58
Compare
54faf82 to
4395143
Compare
fe92563 to
7ec45b6
Compare
fb2af82 to
06ca2c1
Compare
|
/retest |
There was a problem hiding this comment.
Hey - I've found 4 issues
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `pkg/rate/limiter.go:99-100` </location>
<code_context>
+// TryConsume attempts to consume one token for the given client.
+// Returns true if allowed, false if rate limit exceeded.
+// Metrics are automatically recorded.
+func (l *Limiter) TryConsume(clientID string) (allowed bool, reason string) {
+ if l.globalRate <= 0 {
+ // Rate limiting disabled
+ return true, ""
</code_context>
<issue_to_address>
**suggestion:** Unlimited mode skips all metrics, which may obscure visibility into workload volume.
When `globalRate <= 0`, `TryConsume` returns `true` without updating `RequestsTotal` (or any metrics). That preserves rate-limiting semantics but drops visibility into request volume in “unlimited” mode. If metrics are used to track load (not just rejections), consider still incrementing `RequestsTotal` (and possibly `RequestsAccepted`) when rate limiting is disabled so dashboards remain useful.
</issue_to_address>
### Comment 2
<location> `pkg/rate/metrics.go:38-46` </location>
<code_context>
+ []string{"workload", "outcome"},
+ )
+
+ // RequestsAccepted tracks requests accepted by the rate limiter (per client).
+ RequestsAccepted = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: metrics.PrometheusNamespace,
+ Subsystem: metrics.CentralSubsystem.String(),
+ Name: "rate_limiter_requests_accepted_total",
+ Help: "Requests accepted by the rate limiter",
+ },
+ []string{"workload", "client_id"},
+ )
+
</code_context>
<issue_to_address>
**suggestion (performance):** Per-client metrics keyed by client_id may introduce high-cardinality Prometheus series.
`RequestsAccepted` and `RequestsRejected` are labeled by `client_id` (cluster ID), which could lead to an unbounded number of series over a Central instance’s lifetime and strain Prometheus. If that’s realistic for your deployments, consider either limiting these per-client metrics to debug builds, sampling/aggregating client IDs (e.g., top-N or hashing), or dropping the `client_id` label from always-on metrics and relying on `ActiveClients` + overall `RequestsTotal`, with per-client detail available via logs or ad‑hoc tooling.
</issue_to_address>
### Comment 3
<location> `pkg/rate/limiter_test.go:93` </location>
<code_context>
+
+func TestTryConsume_Disabled(t *testing.T) {
+ limiter := mustNewLimiter(t, "test", 0, 5)
+ for i := range 100 {
+ allowed, reason := limiter.TryConsume("test-cluster")
+ assert.True(t, allowed, "request %d should be allowed when rate limiting is disabled", i)
</code_context>
<issue_to_address>
**issue (bug_risk):** Fix loop syntax: ranging over an integer is invalid Go and will prevent tests from compiling
In this file there are several loops like `for i := range 100 {`, which is invalid in Go and will not compile. To run a fixed number of iterations, replace them with a standard counted loop, e.g. `for i := 0; i < 100; i++ {`, and update all occurrences accordingly so the tests can build and run.
</issue_to_address>
### Comment 4
<location> `pkg/rate/registry.go:8` </location>
<code_context>
+)
+
+var (
+ registry = make(map[string]*Limiter)
+ registryLock sync.RWMutex
+)
</code_context>
<issue_to_address>
**issue (complexity):** Consider removing the global limiter registry in favor of constructing and injecting limiters locally per workload or pipeline.
The new registry does add non‑trivial global complexity (shared mutable state, locks, test reset hook) for behavior that can be achieved with simpler, local construction. You can keep all functionality (including reusing a limiter per workload) without a global singleton.
### 1. Drop the global registry and expose a constructor
Instead of `RegisterLimiter`/`GetLimiter`/`OnClientDisconnectAll`/`ResetForTesting`, keep a simple constructor and let callers own the limiter:
```go
// rate/limiter_factory.go
package rate
// NewWorkloadLimiter constructs a new limiter for a given workload.
func NewWorkloadLimiter(workloadName string, globalRate float64, bucketCapacity int) (*Limiter, error) {
return NewLimiter(workloadName, globalRate, bucketCapacity)
}
```
Pipeline usage:
```go
// vm_index_pipeline.go
limiter, err := rate.NewWorkloadLimiter("vm_index_report", rateLimit, bucketCapacity)
if err != nil {
return nil, err
}
p := &pipeline{
rateLimiter: limiter,
}
func (p *pipeline) OnFinish(clusterID string) {
p.rateLimiter.OnClientDisconnect(clusterID)
}
```
This removes:
- `registry` map and locks
- `RegisterLimiter`, `GetLimiter`, `OnClientDisconnectAll`, `ResetForTesting`
while keeping:
- one limiter instance per pipeline (or per workload, as you decide)
- explicit `OnClientDisconnect` semantics
### 2. If you truly need shared limiters, inject them instead of globals
If sharing a limiter across multiple callers is a requirement, create it at composition time and inject:
```go
// composition.go
vmIndexLimiter, err := rate.NewWorkloadLimiter("vm_index_report", rateLimit, bucketCapacity)
if err != nil {
return err
}
pipeline := NewVMPipeline(vmIndexLimiter)
otherConsumer := NewOtherConsumer(vmIndexLimiter)
```
This still avoids global registry state and test resets; tests can construct/destroy limiters as needed:
```go
func TestPipelineRateLimiting(t *testing.T) {
limiter, _ := rate.NewWorkloadLimiter("test_workload", 10, 100)
p := NewVMPipeline(limiter)
// ...
}
```
These changes preserve functionality while significantly reducing indirection, global state, and test‑only hooks.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
@sourcery-ai check again |
|
Sure! I'm generating a new review now. |
There was a problem hiding this comment.
Hey - I've found 6 issues
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `pkg/rate/limiter.go:99-108` </location>
<code_context>
+// TryConsume attempts to consume one token for the given client.
+// Returns true if allowed, false if rate limit exceeded.
+// Metrics are automatically recorded.
+func (l *Limiter) TryConsume(clientID string) (allowed bool, reason string) {
+ if l.globalRate <= 0 {
+ // Rate limiting disabled, but still record metrics for visibility into request volume.
+ RequestsTotal.WithLabelValues(l.workloadName, OutcomeAccepted).Inc()
+ return true, ""
+ }
+
+ limiter := l.getOrCreateLimiter(clientID)
+
+ // Use AllowN with clock.Now() to support time injection for testing.
+ if limiter.AllowN(l.clock.Now(), 1) {
+ RequestsTotal.WithLabelValues(l.workloadName, OutcomeAccepted).Inc()
+ return true, ""
</code_context>
<issue_to_address>
**issue:** Add basic validation/handling for empty clientID in TryConsume to avoid a shared bucket hotspot
Right now an empty clientID gets its own shared bucket under the "" key, so any caller that forgets to pass a real client ID will be lumped together, breaking per-client fairness and making behavior surprising. Consider either rejecting empty client IDs with a clear error/reason, or explicitly treating them as unlimited (no rate limiting, but still metrics) so the misuse is obvious and you avoid a silent hotspot.
</issue_to_address>
### Comment 2
<location> `pkg/rate/limiter.go:138-139` </location>
<code_context>
+ return actual.(*gorate.Limiter)
+ }
+
+ log.Infof("New client %s registered for %s rate limiting (clients: %d, rate: %.2f req/s, max bucket capacity: %d)",
+ clientID, l.workloadName, numClients, perClientRate, bucketCapacity)
+
+ // Rebalance all existing limiters with new client count
</code_context>
<issue_to_address>
**suggestion (performance):** Reconsider using Info level for per-client registration to avoid log noise in large deployments
Logging each new client at Info with full rate/bucket details can be very noisy in environments with many clusters or short‑lived clients, and may obscure more important operational logs. Consider moving this to Debug (or adding sampling) while relying on aggregate Prometheus metrics for visibility by default.
```suggestion
log.Debugf("New client %s registered for %s rate limiting (clients: %d, rate: %.2f req/s, max bucket capacity: %d)",
clientID, l.workloadName, numClients, perClientRate, bucketCapacity)
```
</issue_to_address>
### Comment 3
<location> `central/sensor/service/pipeline/virtualmachineindex/pipeline_test.go:400-409` </location>
<code_context>
+func TestPipelineRun_RateLimitEnabled(t *testing.T) {
</code_context>
<issue_to_address>
**suggestion (testing):** Extend rate limit test to assert that the rate-limited request does not perform enrichment or datastore writes and that a NACK is sent when an injector is present
The existing test confirms 5 successful requests and that the 6th returns no error, but it doesn’t actually assert that the 6th request is rate-limited at the pipeline level. Please also assert that `EnrichVirtualMachineWithVulnerabilities` and `UpdateVirtualMachineScan` are not invoked on the 6th request (e.g., by using `Times(5)` or moving expectations outside the loop). In addition, use a mocked `MessageInjector` with a context that has `SensorACKSupport` so you can verify that a `SensorACK` with `Action=NACK` and `MessageType=VM_INDEX_REPORT` is injected for the rate-limited request.
Suggested implementation:
```golang
// TestPipelineRun_RateLimitEnabled tests that rate limiting rejects requests when enabled
// and ensures that rate-limited requests do not perform enrichment/datastore writes while
// emitting a NACK SensorACK when an injector is present.
func TestPipelineRun_RateLimitEnabled(t *testing.T) {
t.Setenv(features.VirtualMachines.EnvVar(), "true")
ctrl := gomock.NewController(t)
defer ctrl.Finish()
vmDatastore := vmDatastoreMocks.NewMockDataStore(ctrl)
enricher := vmEnricherMocks.NewMockVirtualMachineEnricher(ctrl)
injector := pipelineMocks.NewMockMessageInjector(ctrl)
// Allow 5 requests, bucket size 5 – the 6th should be rate-limited.
rateLimiter := mustNewLimiter(t, "test", 5, 5)
pipeline := &pipelineImpl{
vmDatastore: vmDatastore,
enricher: enricher,
rateLimiter: rateLimiter,
injector: injector,
}
const (
testClusterID = "cluster-id"
vmID = "vm-id"
)
// Build a context that enables Sensor ACK support so that the pipeline will
// send ACK/NACK messages via the injector.
baseCtx := context.Background()
ctx := sensorCtxWithACKSupport(baseCtx)
// We expect enrichment and datastore writes only for the first 5 (non-rate-limited) requests.
enricher.EXPECT().
EnrichVirtualMachineWithVulnerabilities(gomock.Any(), gomock.Any()).
Times(5)
vmDatastore.EXPECT().
UpdateVirtualMachineScan(gomock.Any(), vmID, gomock.Any()).
Times(5)
// Exactly one NACK should be injected for the 6th (rate-limited) request.
injector.EXPECT().
InjectMessage(gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, msg *central.MsgToSensor) error {
ack := msg.GetSensorAck()
require.NotNil(t, ack, "expected SensorACK message for rate-limited request")
assert.Equal(t, central.SensorAck_NACK, ack.GetAction(), "rate-limited request should be NACKed")
assert.Equal(t, central.SensorAck_VM_INDEX_REPORT, ack.GetMessageType(), "NACK should be for VM_INDEX_REPORT")
return nil
}).
Times(1)
// Send 6 requests – the first 5 should be processed successfully,
// the 6th should be rate-limited.
for i := 0; i < 6; i++ {
msg := buildVMIndexReportMsg(vmID)
err := pipeline.Run(ctx, testClusterID, msg, injector)
assert.NoError(t, err, "Run should not return an error even when rate-limited (request %d)", i+1)
}
}
```
Because only a portion of the file was visible, you will likely need to adjust the following to fit your existing codebase:
1. **MessageInjector mock and field name**
- Replace `pipelineMocks.NewMockMessageInjector(ctrl)` with the actual mock constructor you use for the `MessageInjector` interface (e.g. it might live under a different mocks package).
- Ensure the `pipelineImpl` struct actually has an `injector` (or similarly named) field accepting a `MessageInjector`. If the injector is passed differently (e.g. as a parameter or via another struct), wire it up accordingly.
2. **Context helper for SensorACK support**
- Implement or use your existing helper `sensorCtxWithACKSupport(baseCtx)` to return a context with `SensorACKSupport` enabled. If you already have a helper for this in other tests (e.g. in another pipeline test file), reuse that instead of introducing a new one.
3. **Message builders and enums**
- Implement or reuse `buildVMIndexReportMsg(vmID)` so it returns the same message type your VM index pipeline normally receives. In existing tests you might already have a helper that builds VM index reports; if so, use that.
- Adjust enum names used in the ASSERTs to match your actual protobuf types:
- `central.SensorAck_NACK` should be the correct enum for a NACK action.
- `central.SensorAck_VM_INDEX_REPORT` should match your VM index report message type enum (if the exact name differs in your proto, update accordingly).
4. **Imports**
- Ensure the test file imports any additional packages used above:
- `context`
- Your `pipelineMocks` (or equivalent) package for the `MessageInjector` mock.
- `github.com/stackrox/rox/generated/internalapi/central` (or your actual internal API import path) for `central.MsgToSensor` and `central.SensorAck` types.
- `github.com/stretchr/testify/require` if not already imported for `require.NotNil`.
- Remove or update any now-unused imports.
5. **Rate limiter wiring**
- Confirm that `mustNewLimiter` and the `rateLimiter` field on `pipelineImpl` correspond to the actual rate-limiting implementation in this pipeline. If the field name or type differs, adjust accordingly.
6. **Signature of `pipeline.Run`**
- The call `pipeline.Run(ctx, testClusterID, msg, injector)` assumes the 4th argument is an injector (or compatible type). If your `Run` signature differs, adapt the call and where the injector is passed so that the test actually exercises the NACK path for rate-limited requests.
</issue_to_address>
### Comment 4
<location> `central/sensor/service/pipeline/virtualmachineindex/pipeline_test.go:366-375` </location>
<code_context>
assert.NoError(t, err)
}
+
+// TestPipelineRun_RateLimitDisabled tests that rate limiting is disabled when configured with 0
+func TestPipelineRun_RateLimitDisabled(t *testing.T) {
+ t.Setenv(features.VirtualMachines.EnvVar(), "true")
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ vmDatastore := vmDatastoreMocks.NewMockDataStore(ctrl)
+ enricher := vmEnricherMocks.NewMockVirtualMachineEnricher(ctrl)
+ rateLimiter := mustNewLimiter(t, "test", 0, 50) // Disabled
+
+ pipeline := &pipelineImpl{
+ vmDatastore: vmDatastore,
+ enricher: enricher,
+ rateLimiter: rateLimiter,
+ }
+
+ vmID := "vm-1"
+ msg := createVMIndexMessage(vmID, central.ResourceAction_SYNC_RESOURCE)
+
+ // Should process all 100 requests without rate limiting
+ for i := 0; i < 100; i++ {
+ enricher.EXPECT().
+ EnrichVirtualMachineWithVulnerabilities(gomock.Any(), gomock.Any()).
</code_context>
<issue_to_address>
**suggestion (testing):** Add coverage for behavior when the rate limiter is nil in the pipeline
There’s still an untested branch where `p.rateLimiter` is `nil` and the pipeline logs a warning and sends a NACK (when ACKs are supported) before dropping the report. Please add a test that builds a `pipelineImpl` with `rateLimiter: nil`, uses a context with a connection that has `SensorACKSupport` and a mocked `MessageInjector`, then calls `Run` and verifies that no enrichment/datastore calls occur and exactly one NACK with `MessageType=VM_INDEX_REPORT` is sent. This will cover the nil-limiter branch and guard against regressions.
Suggested implementation:
```golang
err := pipeline.Run(ctx, testClusterID, msg, nil)
assert.NoError(t, err, "request %d should succeed with rate limiting disabled", i)
}
}
// TestPipelineRun_NilRateLimiter_WithACKSupport tests behavior when the rateLimiter is nil and ACKs are supported.
func TestPipelineRun_NilRateLimiter_WithACKSupport(t *testing.T) {
t.Setenv(features.VirtualMachines.EnvVar(), "true")
ctrl := gomock.NewController(t)
defer ctrl.Finish()
// Mocks for datastore and enricher – no expectations should be set on these,
// because the pipeline must short‑circuit before doing any work.
vmDatastore := vmDatastoreMocks.NewMockDataStore(ctrl)
enricher := vmEnricherMocks.NewMockVirtualMachineEnricher(ctrl)
// Mock message injector that should receive exactly one NACK for VM_INDEX_REPORT.
msgInjector := messageInjectorMocks.NewMockMessageInjector(ctrl)
pipeline := &pipelineImpl{
vmDatastore: vmDatastore,
enricher: enricher,
// rateLimiter must be nil to cover the nil‑limiter branch.
rateLimiter: nil,
// Make sure the injector field name matches pipelineImpl's definition.
injector: msgInjector,
}
vmID := "vm-1"
msg := createVMIndexMessage(vmID, central.ResourceAction_SYNC_RESOURCE)
// Build a context that has a connection with SensorACKSupport enabled.
// The helper used here should attach a connection that advertises ACK support
// and uses msgInjector to send messages back to the sensor.
ctxWithConn := contextWithACKSupport(ctx, msgInjector)
// Expect exactly one NACK with MessageType = VM_INDEX_REPORT and no other messages.
msgInjector.EXPECT().
Send(gomock.Any()).
Do(func(m *central.MsgToSensor) {
ack := m.GetAckResponse()
require.NotNil(t, ack, "expected an AckResponse message")
assert.Equal(t, central.AckResponse_NACK, ack.GetStatus(), "expected NACK status")
assert.Equal(t, central.MsgToSensor_VM_INDEX_REPORT, ack.GetMessageType(), "expected VM_INDEX_REPORT message type")
}).
Times(1)
// Run the pipeline once; it should short‑circuit due to nil rateLimiter,
// emit a NACK, and not call any datastore/enricher methods.
err := pipeline.Run(ctxWithConn, testClusterID, msg, nil)
assert.NoError(t, err, "pipeline Run should not error when rateLimiter is nil")
}
```
To make this compile and integrate correctly, you will also need to:
1. **Import the message injector mocks**
Add an import at the top of `pipeline_test.go` for the message injector mock package, matching how mocks are organized in this repo, for example:
```go
messageInjectorMocks "github.com/stackrox/rox/central/sensor/service/pipeline/mocks"
```
or whatever the existing mock package path/name is in your codebase.
2. **Use the correct injector field name on `pipelineImpl`**
In the test, `injector: msgInjector,` assumes `pipelineImpl` has a field named `injector` of the appropriate interface type (e.g., `MessageInjector`).
If the field is named differently (e.g., `messageInjector` or `toSensor`), or if there is a constructor used elsewhere, update the field name / construction to match the existing struct definition and conventions.
3. **Use the actual context/connection helper for ACK support**
Replace `contextWithACKSupport(ctx, msgInjector)` with the real helper used in this test file (or package) to attach a connection that has `SensorACKSupport` enabled and uses `msgInjector`.
For example, if you already have something like:
```go
ctx := testContextWithACKSupport(t, msgInjector)
```
or a helper based on a `connection.ContextWithConnection(...)` API, use that instead.
4. **Use the actual injector method name and signature**
The example uses:
```go
msgInjector.EXPECT().
Send(gomock.Any()).
Do(func(m *central.MsgToSensor) { ... }).
Times(1)
```
Adjust this to match the real interface method (`InjectMessage`, `SendMessage`, etc.) and its parameters. For instance, if the method is:
```go
InjectMessage(ctx context.Context, msg *central.MsgToSensor) error
```
then the expectation should be:
```go
msgInjector.EXPECT().
InjectMessage(gomock.Any(), gomock.Any()).
Do(func(_ context.Context, m *central.MsgToSensor) { ... }).
Return(nil).
Times(1)
```
5. **Align AckResponse field accessors with your proto**
The code assumes:
```go
ack := m.GetAckResponse()
ack.GetStatus()
ack.GetMessageType()
```
If your generated proto uses different names (e.g., `GetMsgType` or nested enums), adjust the field and enum names accordingly while still asserting:
- `Status == NACK`
- `MessageType == VM_INDEX_REPORT`.
6. **Ensure no expectations on datastore/enricher**
The absence of `.EXPECT()` on `vmDatastore` and `enricher` intentionally verifies that no calls are made; if you want to be stricter, you can add:
```go
ctrl.Finish()
```
(already present) and/or `gomock.InOrder` with only the injector expectation, so any stray calls to the other mocks will fail the test.
These adjustments will ensure the new test compiles correctly, exercises the nil-rate-limiter branch, and verifies that the pipeline emits a single NACK for `VM_INDEX_REPORT` without performing enrichment or datastore updates.
</issue_to_address>
### Comment 5
<location> `central/sensor/service/pipeline/virtualmachineindex/pipeline_test.go:188-197` </location>
<code_context>
func (suite *PipelineTestSuite) TestNewPipeline() {
mockDatastore := vmDatastoreMocks.NewMockDataStore(suite.mockCtrl)
mockEnricher := vmEnricherMocks.NewMockVirtualMachineEnricher(suite.mockCtrl)
- pipeline := newPipeline(mockDatastore, mockEnricher)
+ rateLimiter := mustNewLimiter(suite.T(), "test", 0, 50)
+ pipeline := newPipeline(mockDatastore, mockEnricher, rateLimiter)
suite.NotNil(pipeline)
impl, ok := pipeline.(*pipelineImpl)
suite.True(ok, "Should return pipelineImpl instance")
suite.Equal(mockDatastore, impl.vmDatastore)
suite.Equal(mockEnricher, impl.enricher)
+ suite.Equal(rateLimiter, impl.rateLimiter)
}
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test to verify that OnFinish propagates cluster disconnect to the rate limiter
Right now, there’s no test that checks `OnFinish` actually passes the cluster ID to the limiter. Please add a test that builds a `pipelineImpl` with a small fake/mocked limiter (with `OnClientDisconnect` capturing the last `clusterID`), calls `OnFinish("cluster-1")`, and asserts the limiter was called with that ID. This will protect the pipeline–limiter integration from regressions.
Suggested implementation:
```golang
var ctx = context.Background()
// mustNewLimiter creates a rate limiter or fails the test.
func mustNewLimiter(t require.TestingT, workloadName string, globalRate float64, bucketCapacity int) *rate.Limiter {
limiter, err := rate.NewLimiter(workloadName, globalRate, bucketCapacity)
require.NoError(t, err)
return limiter
}
// fakeRateLimiter is a lightweight test double that records the last
// cluster ID passed to OnClientDisconnect so we can verify propagation.
type fakeRateLimiter struct {
lastDisconnectedClusterID string
}
// OnClientDisconnect records the cluster ID; add any other methods needed
// to satisfy the limiter interface used by pipelineImpl.
func (f *fakeRateLimiter) OnClientDisconnect(clusterID string) {
f.lastDisconnectedClusterID = clusterID
}
func (suite *PipelineTestSuite) TestOnFinishPropagatesClusterDisconnect() {
// Use a fake limiter so we can observe calls to OnClientDisconnect.
fakeLimiter := &fakeRateLimiter{}
p := &pipelineImpl{
vmDatastore: suite.vmDatastore,
enricher: suite.enricher,
rateLimiter: fakeLimiter,
}
const clusterID = "cluster-1"
p.OnFinish(clusterID)
suite.Equal(clusterID, fakeLimiter.lastDisconnectedClusterID, "OnFinish should propagate cluster disconnect to the rate limiter")
}
func TestPipeline(t *testing.T) {
```
The above change assumes that:
1. The `rateLimiter` field on `pipelineImpl` is an interface type that is satisfied by any struct exposing `OnClientDisconnect(clusterID string)`. If the actual signature is different (e.g. includes a `context.Context` or returns an error), update `fakeRateLimiter.OnClientDisconnect` to match that exact signature.
2. The limiter interface may have additional methods (e.g. `Allow`, `Wait`, `OnClientConnect`, etc.). If compilation fails due to missing methods, add no-op implementations for those methods on `fakeRateLimiter` so it fully satisfies the interface used by `pipelineImpl`.
3. If `OnFinish` currently has a different signature (e.g. `OnFinish(ctx context.Context, clusterID string)`), adjust the call site in `TestOnFinishPropagatesClusterDisconnect` accordingly, passing a suitable context.
</issue_to_address>
### Comment 6
<location> `pkg/rate/limiter.go:36` </location>
<code_context>
+// Limiter provides per-client fair rate limiting for any workload type.
+// Each client gets an equal share (1/N) of the global capacity, with automatic
+// rebalancing when clients connect or disconnect.
+type Limiter struct {
+ workloadName string // name for logging/metrics (e.g., "vm_index_report")
+ globalRate float64 // requests per second (0 = unlimited)
</code_context>
<issue_to_address>
**issue (complexity):** Consider replacing the sync.Map plus repeated bucket scans with a simple mutex-protected map and a tracked client count to streamline rebalancing logic and concurrency handling.
You can keep the fair-per-client semantics and dynamic rebalancing while simplifying concurrency and reducing iterations over the buckets.
### 1. Replace `sync.Map` + `countActiveClients` with a mutex-protected map and `numClients`
Right now, each new client:
- calls `countActiveClients` (one full `Range`),
- then `rebalanceLimiters`, which calls `countActiveClients` again and `Range`s again to mutate all limiters.
You can simplify by:
- using a `map[string]*rate.Limiter` guarded by a `sync.Mutex`,
- tracking `numClients` directly instead of recomputing it by iterating,
- rebalancing under the same mutex, with only a single `Range`-like iteration per change.
This reduces concurrency complexity and the number of full scans while preserving behavior.
```go
type Limiter struct {
workloadName string
globalRate float64
bucketCapacity int
mu sync.Mutex
buckets map[string]*gorate.Limiter // map[clientID]*rate.Limiter
numClients int
clock Clock
}
```
Initialize the map in the constructor:
```go
return &Limiter{
workloadName: workloadName,
globalRate: globalRate,
bucketCapacity: bucketCapacity,
clock: clock,
buckets: make(map[string]*gorate.Limiter),
}, nil
```
### 2. Simplify `getOrCreateLimiter` + rebalancing
Under the mutex:
- check if the limiter exists,
- if not, increment `numClients`, compute per-client rate/burst once,
- create the new limiter,
- update all existing limiters,
- release the lock.
```go
func (l *Limiter) getOrCreateLimiter(clientID string) *gorate.Limiter {
l.mu.Lock()
defer l.mu.Unlock()
if limiter, ok := l.buckets[clientID]; ok {
return limiter
}
l.numClients++
numClients := l.numClients
perClientRate := l.globalRate / float64(numClients)
bucketCapacity := l.perClientBucketCapacity(numClients)
// create limiter for this client
newLimiter := gorate.NewLimiter(gorate.Limit(perClientRate), bucketCapacity)
l.buckets[clientID] = newLimiter
// rebalance all limiters (including the new one)
for _, limiter := range l.buckets {
limiter.SetLimit(gorate.Limit(perClientRate))
limiter.SetBurst(bucketCapacity)
}
// metrics + log inside the same critical section
ActiveClients.
WithLabelValues(l.workloadName).
Set(float64(numClients))
PerClientRate.
WithLabelValues(l.workloadName).
Set(perClientRate)
PerClientBucketCapacity.
WithLabelValues(l.workloadName).
Set(float64(bucketCapacity))
log.Infof("New client %s registered for %s rate limiting (clients: %d, rate: %.2f req/s, max bucket capacity: %d)",
clientID, l.workloadName, numClients, perClientRate, bucketCapacity)
return newLimiter
}
```
### 3. Mirror the pattern in `OnClientDisconnect`
You can mirror the above pattern for disconnects, reusing `numClients` and avoiding a separate `countActiveClients` pass:
```go
func (l *Limiter) OnClientDisconnect(clientID string) {
if l.globalRate <= 0 {
return
}
l.mu.Lock()
defer l.mu.Unlock()
if _, ok := l.buckets[clientID]; !ok {
return
}
delete(l.buckets, clientID)
l.numClients--
numClients := l.numClients
if numClients == 0 {
ActiveClients.WithLabelValues(l.workloadName).Set(0)
PerClientRate.WithLabelValues(l.workloadName).Set(0)
PerClientBucketCapacity.WithLabelValues(l.workloadName).Set(0)
log.Infof("Client %s disconnected from %s rate limiting (remaining clients: 0)",
clientID, l.workloadName)
return
}
perClientRate := l.globalRate / float64(numClients)
bucketCapacity := l.perClientBucketCapacity(numClients)
for _, limiter := range l.buckets {
limiter.SetLimit(gorate.Limit(perClientRate))
limiter.SetBurst(bucketCapacity)
}
ActiveClients.WithLabelValues(l.workloadName).Set(float64(numClients))
PerClientRate.WithLabelValues(l.workloadName).Set(perClientRate)
PerClientBucketCapacity.WithLabelValues(l.workloadName).Set(float64(bucketCapacity))
log.Infof("Client %s disconnected from %s rate limiting (remaining clients: %d)",
clientID, l.workloadName, numClients)
}
```
This keeps:
- per-client fair-share semantics,
- dynamic rebalancing on connect/disconnect,
- injectable clock,
- integrated metrics,
but removes:
- double full-map scans for every connect/disconnect,
- `sync.Map` + `countActiveClients` concurrency complexity, in favor of a straightforward `map+mutex` pattern.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
Hey @vikin91, I've posted a new review for you! |
|
@sourcery-ai check again |
|
Sure! I'm generating a new review now. |
This is to avoid state explosion when using many client IDs.
c3b7da9 to
b2bcd0d
Compare
Co-authored-by: Mladen Todorovic <mtodor@gmail.com>
|
@vikin91: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
Description
Add rate limiting for VM index reports in Central to prevent Scanner V4 overload and OOM kills.
Problem: When many sensors send VM index reports simultaneously, Central's unbounded queue can fill up with large index reports, causing memory pressure and potential OOM kills.
Solution: Implement a token bucket rate limiter with per-sensor fairness:
Key changes:
pkg/rate- Generic reusable rate limiter package with:golang.org/x/time/rateEnvironment variables:
ROX_VM_INDEX_REPORT_RATE_LIMIT(default: "1.0" req/sec, "0" to disable)ROX_VM_INDEX_REPORT_BUCKET_CAPACITY(default: 5 tokens)AI contribution: Initial implementation and tests generated by AI. Architecture decisions, naming conventions (e.g.,
bucketCapacityvsburstDuration), float64 rate type, error handling approach (return errors vs panic), and test robustness reviewed and corrected by user.User-facing documentation
Testing and quality
Automated testing
How I validated my change
Sensor sending:

Central's rate limiter:

Looking at the logs: