Skip to content

ROX-32316: Add central rate limiter for VM index reports#18220

Merged
vikin91 merged 33 commits intomasterfrom
piotr/ROX-32316-central-rate-limiter
Jan 21, 2026
Merged

ROX-32316: Add central rate limiter for VM index reports#18220
vikin91 merged 33 commits intomasterfrom
piotr/ROX-32316-central-rate-limiter

Conversation

@vikin91
Copy link
Contributor

@vikin91 vikin91 commented Dec 15, 2025

Description

Add rate limiting for VM index reports in Central to prevent Scanner V4 overload and OOM kills.

Problem: When many sensors send VM index reports simultaneously, Central's unbounded queue can fill up with large index reports, causing memory pressure and potential OOM kills.

Solution: Implement a token bucket rate limiter with per-sensor fairness:

  • Global rate limit shared equally among connected sensors (1/N each)
  • Automatic rebalancing when sensors connect/disconnect
  • Configurable via environment variables
  • Sending
    • ACK back to Sensor when the processing was successful
    • NACK back to Sensor when rate limit or other error was hit

Key changes:

  1. pkg/rate - Generic reusable rate limiter package with:

    • Token bucket algorithm using golang.org/x/time/rate
    • Per-sensor fair distribution of global rate
    • Float64 rate support (e.g., 0.5 req/sec)
    • Prometheus metrics for observability
    • Global registry for multiple workload types
  2. Environment variables:

    • ROX_VM_INDEX_REPORT_RATE_LIMIT (default: "1.0" req/sec, "0" to disable)
    • ROX_VM_INDEX_REPORT_BUCKET_CAPACITY (default: 5 tokens)

AI contribution: Initial implementation and tests generated by AI. Architecture decisions, naming conventions (e.g., bucketCapacity vs burstDuration), float64 rate type, error handling approach (return errors vs panic), and test robustness reviewed and corrected by user.

User-facing documentation

Testing and quality

  • the change is production ready: the change is GA, or otherwise the functionality is gated by a feature flag
  • CI results are inspected

Automated testing

  • added unit tests
  • added e2e tests
  • added regression tests
  • added compatibility tests
  • modified existing tests

How I validated my change

  • unit tests locally
  • CI
  • On a real cluster with fake workloads:
  1. Generating tons of index reports - more than 3 per second (that would normally cause Central to go OOM) and seeing the rate limiter preventing Central from crashing.

Sensor sending:
Screenshot 2026-01-13 at 13 34 58

Central's rate limiter:
Screenshot 2026-01-13 at 13 35 28

  1. Confirming that it works by looking at the metrics:
# HELP rox_central_rate_limiter_active_clients Current number of active clients being rate limited
# TYPE rox_central_rate_limiter_active_clients gauge
rox_central_rate_limiter_active_clients{workload="vm_index_report"} 1
 
# HELP rox_central_rate_limiter_per_client_bucket_max_tokens Current per-client token bucket capacity (max tokens). Must be a positive integer. This is the maximum number of requests that can be accepted in a burst before rate limiting kicks in.
# TYPE rox_central_rate_limiter_per_client_bucket_max_tokens gauge
rox_central_rate_limiter_per_client_bucket_max_tokens{workload="vm_index_report"} 1

# HELP rox_central_rate_limiter_per_client_bucket_refill_rate_per_second Current per-client rate limit in requests per second. This is also the rate at which tokens are refilled.
# TYPE rox_central_rate_limiter_per_client_bucket_refill_rate_per_second gauge
rox_central_rate_limiter_per_client_bucket_refill_rate_per_second{workload="vm_index_report"} 1

# HELP rox_central_rate_limiter_requests_total Total requests received by the rate limiter
# TYPE rox_central_rate_limiter_requests_total counter
rox_central_rate_limiter_requests_total{outcome="accepted",workload="vm_index_report"} 3523
rox_central_rate_limiter_requests_total{outcome="rejected",workload="vm_index_report"} 34776

Looking at the logs:

➜ logs-central | grep "Rate limit"
sensor/service/pipeline/virtualmachineindex: 2026/01/09 15:49:55.525031 pipeline.go:121: Info: Dropping VM index report VM 00000000-0000-4000-8000-000000001017 from cluster ab10b3ae-ed04-41ee-a47f-24611552e973: rate limit exceeded
sensor/service/pipeline/virtualmachineindex: 2026/01/09 15:50:58.142185 pipeline.go:121: Info: Dropping VM index report VM 00000000-0000-4000-8000-000000001009 from cluster ab10b3ae-ed04-41ee-a47f-24611552e973: rate limit exceeded

@vikin91
Copy link
Contributor Author

vikin91 commented Dec 15, 2025

@openshift-ci
Copy link

openshift-ci bot commented Dec 15, 2025

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@rhacs-bot
Copy link
Contributor

rhacs-bot commented Dec 15, 2025

Images are ready for the commit at 149a807.

To use with deploy scripts, first export MAIN_IMAGE_TAG=4.10.x-852-g149a8077bd.

@codecov
Copy link

codecov bot commented Dec 15, 2025

Codecov Report

❌ Patch coverage is 83.65385% with 34 lines in your changes missing coverage. Please review.
✅ Project coverage is 49.13%. Comparing base (30c6100) to head (149a807).
⚠️ Report is 18 commits behind head on master.

Files with missing lines Patch % Lines
...r/service/pipeline/virtualmachineindex/pipeline.go 75.36% 11 Missing and 6 partials ⚠️
pkg/rate/limiter.go 89.14% 11 Missing and 3 partials ⚠️
...ntral/sensor/service/connection/connection_impl.go 0.00% 1 Missing ⚠️
central/sensor/service/connection/context.go 0.00% 1 Missing ⚠️
central/sensor/service/connection/manager_impl.go 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master   #18220      +/-   ##
==========================================
+ Coverage   49.06%   49.13%   +0.07%     
==========================================
  Files        2650     2653       +3     
  Lines      198921   199265     +344     
==========================================
+ Hits        97592    97904     +312     
- Misses      93896    93918      +22     
- Partials     7433     7443      +10     
Flag Coverage Δ
go-unit-tests 49.13% <83.65%> (+0.07%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@vikin91 vikin91 force-pushed the piotr/ROX-32316-sensor-ack branch from 225c6d4 to 92246b0 Compare December 17, 2025 15:56
@vikin91 vikin91 force-pushed the piotr/ROX-32316-central-rate-limiter branch from cdefaba to 1bbae43 Compare December 17, 2025 15:56
@vikin91 vikin91 force-pushed the piotr/ROX-32316-sensor-ack branch from 92246b0 to 54faf82 Compare December 18, 2025 11:33
@vikin91 vikin91 force-pushed the piotr/ROX-32316-central-rate-limiter branch from 1bbae43 to 8f2bc58 Compare December 18, 2025 11:33
@vikin91 vikin91 force-pushed the piotr/ROX-32316-sensor-ack branch from 54faf82 to 4395143 Compare January 2, 2026 14:29
@vikin91 vikin91 force-pushed the piotr/ROX-32316-central-rate-limiter branch 2 times, most recently from fe92563 to 7ec45b6 Compare January 6, 2026 10:54
Base automatically changed from piotr/ROX-32316-sensor-ack to master January 8, 2026 16:51
@vikin91 vikin91 force-pushed the piotr/ROX-32316-central-rate-limiter branch from fb2af82 to 06ca2c1 Compare January 8, 2026 16:57
@vikin91 vikin91 marked this pull request as ready for review January 9, 2026 08:07
@vikin91
Copy link
Contributor Author

vikin91 commented Jan 12, 2026

/retest

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 4 issues

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location> `pkg/rate/limiter.go:99-100` </location>
<code_context>
+// TryConsume attempts to consume one token for the given client.
+// Returns true if allowed, false if rate limit exceeded.
+// Metrics are automatically recorded.
+func (l *Limiter) TryConsume(clientID string) (allowed bool, reason string) {
+	if l.globalRate <= 0 {
+		// Rate limiting disabled
+		return true, ""
</code_context>

<issue_to_address>
**suggestion:** Unlimited mode skips all metrics, which may obscure visibility into workload volume.

When `globalRate <= 0`, `TryConsume` returns `true` without updating `RequestsTotal` (or any metrics). That preserves rate-limiting semantics but drops visibility into request volume in “unlimited” mode. If metrics are used to track load (not just rejections), consider still incrementing `RequestsTotal` (and possibly `RequestsAccepted`) when rate limiting is disabled so dashboards remain useful.
</issue_to_address>

### Comment 2
<location> `pkg/rate/metrics.go:38-46` </location>
<code_context>
+		[]string{"workload", "outcome"},
+	)
+
+	// RequestsAccepted tracks requests accepted by the rate limiter (per client).
+	RequestsAccepted = prometheus.NewCounterVec(
+		prometheus.CounterOpts{
+			Namespace: metrics.PrometheusNamespace,
+			Subsystem: metrics.CentralSubsystem.String(),
+			Name:      "rate_limiter_requests_accepted_total",
+			Help:      "Requests accepted by the rate limiter",
+		},
+		[]string{"workload", "client_id"},
+	)
+
</code_context>

<issue_to_address>
**suggestion (performance):** Per-client metrics keyed by client_id may introduce high-cardinality Prometheus series.

`RequestsAccepted` and `RequestsRejected` are labeled by `client_id` (cluster ID), which could lead to an unbounded number of series over a Central instance’s lifetime and strain Prometheus. If that’s realistic for your deployments, consider either limiting these per-client metrics to debug builds, sampling/aggregating client IDs (e.g., top-N or hashing), or dropping the `client_id` label from always-on metrics and relying on `ActiveClients` + overall `RequestsTotal`, with per-client detail available via logs or ad‑hoc tooling.
</issue_to_address>

### Comment 3
<location> `pkg/rate/limiter_test.go:93` </location>
<code_context>
+
+func TestTryConsume_Disabled(t *testing.T) {
+	limiter := mustNewLimiter(t, "test", 0, 5)
+	for i := range 100 {
+		allowed, reason := limiter.TryConsume("test-cluster")
+		assert.True(t, allowed, "request %d should be allowed when rate limiting is disabled", i)
</code_context>

<issue_to_address>
**issue (bug_risk):** Fix loop syntax: ranging over an integer is invalid Go and will prevent tests from compiling

In this file there are several loops like `for i := range 100 {`, which is invalid in Go and will not compile. To run a fixed number of iterations, replace them with a standard counted loop, e.g. `for i := 0; i < 100; i++ {`, and update all occurrences accordingly so the tests can build and run.
</issue_to_address>

### Comment 4
<location> `pkg/rate/registry.go:8` </location>
<code_context>
+)
+
+var (
+	registry     = make(map[string]*Limiter)
+	registryLock sync.RWMutex
+)
</code_context>

<issue_to_address>
**issue (complexity):** Consider removing the global limiter registry in favor of constructing and injecting limiters locally per workload or pipeline.

The new registry does add non‑trivial global complexity (shared mutable state, locks, test reset hook) for behavior that can be achieved with simpler, local construction. You can keep all functionality (including reusing a limiter per workload) without a global singleton.

### 1. Drop the global registry and expose a constructor

Instead of `RegisterLimiter`/`GetLimiter`/`OnClientDisconnectAll`/`ResetForTesting`, keep a simple constructor and let callers own the limiter:

```go
// rate/limiter_factory.go
package rate

// NewWorkloadLimiter constructs a new limiter for a given workload.
func NewWorkloadLimiter(workloadName string, globalRate float64, bucketCapacity int) (*Limiter, error) {
    return NewLimiter(workloadName, globalRate, bucketCapacity)
}
```

Pipeline usage:

```go
// vm_index_pipeline.go
limiter, err := rate.NewWorkloadLimiter("vm_index_report", rateLimit, bucketCapacity)
if err != nil {
    return nil, err
}

p := &pipeline{
    rateLimiter: limiter,
}

func (p *pipeline) OnFinish(clusterID string) {
    p.rateLimiter.OnClientDisconnect(clusterID)
}
```

This removes:
- `registry` map and locks
- `RegisterLimiter`, `GetLimiter`, `OnClientDisconnectAll`, `ResetForTesting`

while keeping:
- one limiter instance per pipeline (or per workload, as you decide)
- explicit `OnClientDisconnect` semantics

### 2. If you truly need shared limiters, inject them instead of globals

If sharing a limiter across multiple callers is a requirement, create it at composition time and inject:

```go
// composition.go
vmIndexLimiter, err := rate.NewWorkloadLimiter("vm_index_report", rateLimit, bucketCapacity)
if err != nil {
    return err
}

pipeline := NewVMPipeline(vmIndexLimiter)
otherConsumer := NewOtherConsumer(vmIndexLimiter)
```

This still avoids global registry state and test resets; tests can construct/destroy limiters as needed:

```go
func TestPipelineRateLimiting(t *testing.T) {
    limiter, _ := rate.NewWorkloadLimiter("test_workload", 10, 100)
    p := NewVMPipeline(limiter)
    // ...
}
```

These changes preserve functionality while significantly reducing indirection, global state, and test‑only hooks.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@vikin91
Copy link
Contributor Author

vikin91 commented Jan 12, 2026

@sourcery-ai check again

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Jan 12, 2026

Sure! I'm generating a new review now.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 6 issues

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location> `pkg/rate/limiter.go:99-108` </location>
<code_context>
+// TryConsume attempts to consume one token for the given client.
+// Returns true if allowed, false if rate limit exceeded.
+// Metrics are automatically recorded.
+func (l *Limiter) TryConsume(clientID string) (allowed bool, reason string) {
+	if l.globalRate <= 0 {
+		// Rate limiting disabled, but still record metrics for visibility into request volume.
+		RequestsTotal.WithLabelValues(l.workloadName, OutcomeAccepted).Inc()
+		return true, ""
+	}
+
+	limiter := l.getOrCreateLimiter(clientID)
+
+	// Use AllowN with clock.Now() to support time injection for testing.
+	if limiter.AllowN(l.clock.Now(), 1) {
+		RequestsTotal.WithLabelValues(l.workloadName, OutcomeAccepted).Inc()
+		return true, ""
</code_context>

<issue_to_address>
**issue:** Add basic validation/handling for empty clientID in TryConsume to avoid a shared bucket hotspot

Right now an empty clientID gets its own shared bucket under the "" key, so any caller that forgets to pass a real client ID will be lumped together, breaking per-client fairness and making behavior surprising. Consider either rejecting empty client IDs with a clear error/reason, or explicitly treating them as unlimited (no rate limiting, but still metrics) so the misuse is obvious and you avoid a silent hotspot.
</issue_to_address>

### Comment 2
<location> `pkg/rate/limiter.go:138-139` </location>
<code_context>
+		return actual.(*gorate.Limiter)
+	}
+
+	log.Infof("New client %s registered for %s rate limiting (clients: %d, rate: %.2f req/s, max bucket capacity: %d)",
+		clientID, l.workloadName, numClients, perClientRate, bucketCapacity)
+
+	// Rebalance all existing limiters with new client count
</code_context>

<issue_to_address>
**suggestion (performance):** Reconsider using Info level for per-client registration to avoid log noise in large deployments

Logging each new client at Info with full rate/bucket details can be very noisy in environments with many clusters or short‑lived clients, and may obscure more important operational logs. Consider moving this to Debug (or adding sampling) while relying on aggregate Prometheus metrics for visibility by default.

```suggestion
	log.Debugf("New client %s registered for %s rate limiting (clients: %d, rate: %.2f req/s, max bucket capacity: %d)",
		clientID, l.workloadName, numClients, perClientRate, bucketCapacity)
```
</issue_to_address>

### Comment 3
<location> `central/sensor/service/pipeline/virtualmachineindex/pipeline_test.go:400-409` </location>
<code_context>
+func TestPipelineRun_RateLimitEnabled(t *testing.T) {
</code_context>

<issue_to_address>
**suggestion (testing):** Extend rate limit test to assert that the rate-limited request does not perform enrichment or datastore writes and that a NACK is sent when an injector is present

The existing test confirms 5 successful requests and that the 6th returns no error, but it doesn’t actually assert that the 6th request is rate-limited at the pipeline level. Please also assert that `EnrichVirtualMachineWithVulnerabilities` and `UpdateVirtualMachineScan` are not invoked on the 6th request (e.g., by using `Times(5)` or moving expectations outside the loop). In addition, use a mocked `MessageInjector` with a context that has `SensorACKSupport` so you can verify that a `SensorACK` with `Action=NACK` and `MessageType=VM_INDEX_REPORT` is injected for the rate-limited request.

Suggested implementation:

```golang
	// TestPipelineRun_RateLimitEnabled tests that rate limiting rejects requests when enabled
	// and ensures that rate-limited requests do not perform enrichment/datastore writes while
	// emitting a NACK SensorACK when an injector is present.
	func TestPipelineRun_RateLimitEnabled(t *testing.T) {
		t.Setenv(features.VirtualMachines.EnvVar(), "true")

		ctrl := gomock.NewController(t)
		defer ctrl.Finish()

		vmDatastore := vmDatastoreMocks.NewMockDataStore(ctrl)
		enricher := vmEnricherMocks.NewMockVirtualMachineEnricher(ctrl)
		injector := pipelineMocks.NewMockMessageInjector(ctrl)

		// Allow 5 requests, bucket size 5 – the 6th should be rate-limited.
		rateLimiter := mustNewLimiter(t, "test", 5, 5)

		pipeline := &pipelineImpl{
			vmDatastore: vmDatastore,
			enricher:    enricher,
			rateLimiter: rateLimiter,
			injector:    injector,
		}

		const (
			testClusterID = "cluster-id"
			vmID          = "vm-id"
		)

		// Build a context that enables Sensor ACK support so that the pipeline will
		// send ACK/NACK messages via the injector.
		baseCtx := context.Background()
		ctx := sensorCtxWithACKSupport(baseCtx)

		// We expect enrichment and datastore writes only for the first 5 (non-rate-limited) requests.
		enricher.EXPECT().
			EnrichVirtualMachineWithVulnerabilities(gomock.Any(), gomock.Any()).
			Times(5)

		vmDatastore.EXPECT().
			UpdateVirtualMachineScan(gomock.Any(), vmID, gomock.Any()).
			Times(5)

		// Exactly one NACK should be injected for the 6th (rate-limited) request.
		injector.EXPECT().
			InjectMessage(gomock.Any(), gomock.Any()).
			DoAndReturn(func(_ context.Context, msg *central.MsgToSensor) error {
				ack := msg.GetSensorAck()
				require.NotNil(t, ack, "expected SensorACK message for rate-limited request")

				assert.Equal(t, central.SensorAck_NACK, ack.GetAction(), "rate-limited request should be NACKed")
				assert.Equal(t, central.SensorAck_VM_INDEX_REPORT, ack.GetMessageType(), "NACK should be for VM_INDEX_REPORT")

				return nil
			}).
			Times(1)

		// Send 6 requests – the first 5 should be processed successfully,
		// the 6th should be rate-limited.
		for i := 0; i < 6; i++ {
			msg := buildVMIndexReportMsg(vmID)

			err := pipeline.Run(ctx, testClusterID, msg, injector)
			assert.NoError(t, err, "Run should not return an error even when rate-limited (request %d)", i+1)
		}
	}

```

Because only a portion of the file was visible, you will likely need to adjust the following to fit your existing codebase:

1. **MessageInjector mock and field name**
   - Replace `pipelineMocks.NewMockMessageInjector(ctrl)` with the actual mock constructor you use for the `MessageInjector` interface (e.g. it might live under a different mocks package).
   - Ensure the `pipelineImpl` struct actually has an `injector` (or similarly named) field accepting a `MessageInjector`. If the injector is passed differently (e.g. as a parameter or via another struct), wire it up accordingly.

2. **Context helper for SensorACK support**
   - Implement or use your existing helper `sensorCtxWithACKSupport(baseCtx)` to return a context with `SensorACKSupport` enabled. If you already have a helper for this in other tests (e.g. in another pipeline test file), reuse that instead of introducing a new one.

3. **Message builders and enums**
   - Implement or reuse `buildVMIndexReportMsg(vmID)` so it returns the same message type your VM index pipeline normally receives. In existing tests you might already have a helper that builds VM index reports; if so, use that.
   - Adjust enum names used in the ASSERTs to match your actual protobuf types:
     - `central.SensorAck_NACK` should be the correct enum for a NACK action.
     - `central.SensorAck_VM_INDEX_REPORT` should match your VM index report message type enum (if the exact name differs in your proto, update accordingly).

4. **Imports**
   - Ensure the test file imports any additional packages used above:
     - `context`
     - Your `pipelineMocks` (or equivalent) package for the `MessageInjector` mock.
     - `github.com/stackrox/rox/generated/internalapi/central` (or your actual internal API import path) for `central.MsgToSensor` and `central.SensorAck` types.
     - `github.com/stretchr/testify/require` if not already imported for `require.NotNil`.
   - Remove or update any now-unused imports.

5. **Rate limiter wiring**
   - Confirm that `mustNewLimiter` and the `rateLimiter` field on `pipelineImpl` correspond to the actual rate-limiting implementation in this pipeline. If the field name or type differs, adjust accordingly.

6. **Signature of `pipeline.Run`**
   - The call `pipeline.Run(ctx, testClusterID, msg, injector)` assumes the 4th argument is an injector (or compatible type). If your `Run` signature differs, adapt the call and where the injector is passed so that the test actually exercises the NACK path for rate-limited requests.
</issue_to_address>

### Comment 4
<location> `central/sensor/service/pipeline/virtualmachineindex/pipeline_test.go:366-375` </location>
<code_context>
 	assert.NoError(t, err)
 }
+
+// TestPipelineRun_RateLimitDisabled tests that rate limiting is disabled when configured with 0
+func TestPipelineRun_RateLimitDisabled(t *testing.T) {
+	t.Setenv(features.VirtualMachines.EnvVar(), "true")
+	ctrl := gomock.NewController(t)
+	defer ctrl.Finish()
+
+	vmDatastore := vmDatastoreMocks.NewMockDataStore(ctrl)
+	enricher := vmEnricherMocks.NewMockVirtualMachineEnricher(ctrl)
+	rateLimiter := mustNewLimiter(t, "test", 0, 50) // Disabled
+
+	pipeline := &pipelineImpl{
+		vmDatastore: vmDatastore,
+		enricher:    enricher,
+		rateLimiter: rateLimiter,
+	}
+
+	vmID := "vm-1"
+	msg := createVMIndexMessage(vmID, central.ResourceAction_SYNC_RESOURCE)
+
+	// Should process all 100 requests without rate limiting
+	for i := 0; i < 100; i++ {
+		enricher.EXPECT().
+			EnrichVirtualMachineWithVulnerabilities(gomock.Any(), gomock.Any()).
</code_context>

<issue_to_address>
**suggestion (testing):** Add coverage for behavior when the rate limiter is nil in the pipeline

There’s still an untested branch where `p.rateLimiter` is `nil` and the pipeline logs a warning and sends a NACK (when ACKs are supported) before dropping the report. Please add a test that builds a `pipelineImpl` with `rateLimiter: nil`, uses a context with a connection that has `SensorACKSupport` and a mocked `MessageInjector`, then calls `Run` and verifies that no enrichment/datastore calls occur and exactly one NACK with `MessageType=VM_INDEX_REPORT` is sent. This will cover the nil-limiter branch and guard against regressions.

Suggested implementation:

```golang
		err := pipeline.Run(ctx, testClusterID, msg, nil)
		assert.NoError(t, err, "request %d should succeed with rate limiting disabled", i)
	}
}

// TestPipelineRun_NilRateLimiter_WithACKSupport tests behavior when the rateLimiter is nil and ACKs are supported.
func TestPipelineRun_NilRateLimiter_WithACKSupport(t *testing.T) {
	t.Setenv(features.VirtualMachines.EnvVar(), "true")
	ctrl := gomock.NewController(t)
	defer ctrl.Finish()

	// Mocks for datastore and enricher – no expectations should be set on these,
	// because the pipeline must short‑circuit before doing any work.
	vmDatastore := vmDatastoreMocks.NewMockDataStore(ctrl)
	enricher := vmEnricherMocks.NewMockVirtualMachineEnricher(ctrl)

	// Mock message injector that should receive exactly one NACK for VM_INDEX_REPORT.
	msgInjector := messageInjectorMocks.NewMockMessageInjector(ctrl)

	pipeline := &pipelineImpl{
		vmDatastore: vmDatastore,
		enricher:    enricher,
		// rateLimiter must be nil to cover the nil‑limiter branch.
		rateLimiter: nil,
		// Make sure the injector field name matches pipelineImpl's definition.
		injector: msgInjector,
	}

	vmID := "vm-1"
	msg := createVMIndexMessage(vmID, central.ResourceAction_SYNC_RESOURCE)

	// Build a context that has a connection with SensorACKSupport enabled.
	// The helper used here should attach a connection that advertises ACK support
	// and uses msgInjector to send messages back to the sensor.
	ctxWithConn := contextWithACKSupport(ctx, msgInjector)

	// Expect exactly one NACK with MessageType = VM_INDEX_REPORT and no other messages.
	msgInjector.EXPECT().
		Send(gomock.Any()).
		Do(func(m *central.MsgToSensor) {
			ack := m.GetAckResponse()
			require.NotNil(t, ack, "expected an AckResponse message")
			assert.Equal(t, central.AckResponse_NACK, ack.GetStatus(), "expected NACK status")
			assert.Equal(t, central.MsgToSensor_VM_INDEX_REPORT, ack.GetMessageType(), "expected VM_INDEX_REPORT message type")
		}).
		Times(1)

	// Run the pipeline once; it should short‑circuit due to nil rateLimiter,
	// emit a NACK, and not call any datastore/enricher methods.
	err := pipeline.Run(ctxWithConn, testClusterID, msg, nil)
	assert.NoError(t, err, "pipeline Run should not error when rateLimiter is nil")
}

```

To make this compile and integrate correctly, you will also need to:

1. **Import the message injector mocks**  
   Add an import at the top of `pipeline_test.go` for the message injector mock package, matching how mocks are organized in this repo, for example:
   ```go
   messageInjectorMocks "github.com/stackrox/rox/central/sensor/service/pipeline/mocks"
   ```
   or whatever the existing mock package path/name is in your codebase.

2. **Use the correct injector field name on `pipelineImpl`**  
   In the test, `injector: msgInjector,` assumes `pipelineImpl` has a field named `injector` of the appropriate interface type (e.g., `MessageInjector`).  
   If the field is named differently (e.g., `messageInjector` or `toSensor`), or if there is a constructor used elsewhere, update the field name / construction to match the existing struct definition and conventions.

3. **Use the actual context/connection helper for ACK support**  
   Replace `contextWithACKSupport(ctx, msgInjector)` with the real helper used in this test file (or package) to attach a connection that has `SensorACKSupport` enabled and uses `msgInjector`.  
   For example, if you already have something like:
   ```go
   ctx := testContextWithACKSupport(t, msgInjector)
   ```
   or a helper based on a `connection.ContextWithConnection(...)` API, use that instead.

4. **Use the actual injector method name and signature**  
   The example uses:
   ```go
   msgInjector.EXPECT().
       Send(gomock.Any()).
       Do(func(m *central.MsgToSensor) { ... }).
       Times(1)
   ```
   Adjust this to match the real interface method (`InjectMessage`, `SendMessage`, etc.) and its parameters. For instance, if the method is:
   ```go
   InjectMessage(ctx context.Context, msg *central.MsgToSensor) error
   ```
   then the expectation should be:
   ```go
   msgInjector.EXPECT().
       InjectMessage(gomock.Any(), gomock.Any()).
       Do(func(_ context.Context, m *central.MsgToSensor) { ... }).
       Return(nil).
       Times(1)
   ```

5. **Align AckResponse field accessors with your proto**  
   The code assumes:
   ```go
   ack := m.GetAckResponse()
   ack.GetStatus()
   ack.GetMessageType()
   ```
   If your generated proto uses different names (e.g., `GetMsgType` or nested enums), adjust the field and enum names accordingly while still asserting:
   - `Status == NACK`
   - `MessageType == VM_INDEX_REPORT`.

6. **Ensure no expectations on datastore/enricher**  
   The absence of `.EXPECT()` on `vmDatastore` and `enricher` intentionally verifies that no calls are made; if you want to be stricter, you can add:
   ```go
   ctrl.Finish()
   ```
   (already present) and/or `gomock.InOrder` with only the injector expectation, so any stray calls to the other mocks will fail the test.

These adjustments will ensure the new test compiles correctly, exercises the nil-rate-limiter branch, and verifies that the pipeline emits a single NACK for `VM_INDEX_REPORT` without performing enrichment or datastore updates.
</issue_to_address>

### Comment 5
<location> `central/sensor/service/pipeline/virtualmachineindex/pipeline_test.go:188-197` </location>
<code_context>
 func (suite *PipelineTestSuite) TestNewPipeline() {
 	mockDatastore := vmDatastoreMocks.NewMockDataStore(suite.mockCtrl)
 	mockEnricher := vmEnricherMocks.NewMockVirtualMachineEnricher(suite.mockCtrl)
-	pipeline := newPipeline(mockDatastore, mockEnricher)
+	rateLimiter := mustNewLimiter(suite.T(), "test", 0, 50)
+	pipeline := newPipeline(mockDatastore, mockEnricher, rateLimiter)
 	suite.NotNil(pipeline)

 	impl, ok := pipeline.(*pipelineImpl)
 	suite.True(ok, "Should return pipelineImpl instance")
 	suite.Equal(mockDatastore, impl.vmDatastore)
 	suite.Equal(mockEnricher, impl.enricher)
+	suite.Equal(rateLimiter, impl.rateLimiter)
 }

</code_context>

<issue_to_address>
**suggestion (testing):** Add a test to verify that OnFinish propagates cluster disconnect to the rate limiter

Right now, there’s no test that checks `OnFinish` actually passes the cluster ID to the limiter. Please add a test that builds a `pipelineImpl` with a small fake/mocked limiter (with `OnClientDisconnect` capturing the last `clusterID`), calls `OnFinish("cluster-1")`, and asserts the limiter was called with that ID. This will protect the pipeline–limiter integration from regressions.

Suggested implementation:

```golang
var ctx = context.Background()

// mustNewLimiter creates a rate limiter or fails the test.
func mustNewLimiter(t require.TestingT, workloadName string, globalRate float64, bucketCapacity int) *rate.Limiter {
	limiter, err := rate.NewLimiter(workloadName, globalRate, bucketCapacity)
	require.NoError(t, err)
	return limiter
}

// fakeRateLimiter is a lightweight test double that records the last
// cluster ID passed to OnClientDisconnect so we can verify propagation.
type fakeRateLimiter struct {
	lastDisconnectedClusterID string
}

// OnClientDisconnect records the cluster ID; add any other methods needed
// to satisfy the limiter interface used by pipelineImpl.
func (f *fakeRateLimiter) OnClientDisconnect(clusterID string) {
	f.lastDisconnectedClusterID = clusterID
}

func (suite *PipelineTestSuite) TestOnFinishPropagatesClusterDisconnect() {
	// Use a fake limiter so we can observe calls to OnClientDisconnect.
	fakeLimiter := &fakeRateLimiter{}

	p := &pipelineImpl{
		vmDatastore: suite.vmDatastore,
		enricher:    suite.enricher,
		rateLimiter: fakeLimiter,
	}

	const clusterID = "cluster-1"

	p.OnFinish(clusterID)

	suite.Equal(clusterID, fakeLimiter.lastDisconnectedClusterID, "OnFinish should propagate cluster disconnect to the rate limiter")
}

func TestPipeline(t *testing.T) {

```

The above change assumes that:
1. The `rateLimiter` field on `pipelineImpl` is an interface type that is satisfied by any struct exposing `OnClientDisconnect(clusterID string)`. If the actual signature is different (e.g. includes a `context.Context` or returns an error), update `fakeRateLimiter.OnClientDisconnect` to match that exact signature.
2. The limiter interface may have additional methods (e.g. `Allow`, `Wait`, `OnClientConnect`, etc.). If compilation fails due to missing methods, add no-op implementations for those methods on `fakeRateLimiter` so it fully satisfies the interface used by `pipelineImpl`.
3. If `OnFinish` currently has a different signature (e.g. `OnFinish(ctx context.Context, clusterID string)`), adjust the call site in `TestOnFinishPropagatesClusterDisconnect` accordingly, passing a suitable context.
</issue_to_address>

### Comment 6
<location> `pkg/rate/limiter.go:36` </location>
<code_context>
+// Limiter provides per-client fair rate limiting for any workload type.
+// Each client gets an equal share (1/N) of the global capacity, with automatic
+// rebalancing when clients connect or disconnect.
+type Limiter struct {
+	workloadName   string  // name for logging/metrics (e.g., "vm_index_report")
+	globalRate     float64 // requests per second (0 = unlimited)
</code_context>

<issue_to_address>
**issue (complexity):** Consider replacing the sync.Map plus repeated bucket scans with a simple mutex-protected map and a tracked client count to streamline rebalancing logic and concurrency handling.

You can keep the fair-per-client semantics and dynamic rebalancing while simplifying concurrency and reducing iterations over the buckets.

### 1. Replace `sync.Map` + `countActiveClients` with a mutex-protected map and `numClients`

Right now, each new client:

- calls `countActiveClients` (one full `Range`),
- then `rebalanceLimiters`, which calls `countActiveClients` again and `Range`s again to mutate all limiters.

You can simplify by:

- using a `map[string]*rate.Limiter` guarded by a `sync.Mutex`,
- tracking `numClients` directly instead of recomputing it by iterating,
- rebalancing under the same mutex, with only a single `Range`-like iteration per change.

This reduces concurrency complexity and the number of full scans while preserving behavior.

```go
type Limiter struct {
    workloadName   string
    globalRate     float64
    bucketCapacity int

    mu         sync.Mutex
    buckets    map[string]*gorate.Limiter // map[clientID]*rate.Limiter
    numClients int

    clock Clock
}
```

Initialize the map in the constructor:

```go
return &Limiter{
    workloadName:   workloadName,
    globalRate:     globalRate,
    bucketCapacity: bucketCapacity,
    clock:          clock,
    buckets:        make(map[string]*gorate.Limiter),
}, nil
```

### 2. Simplify `getOrCreateLimiter` + rebalancing

Under the mutex:

- check if the limiter exists,
- if not, increment `numClients`, compute per-client rate/burst once,
- create the new limiter,
- update all existing limiters,
- release the lock.

```go
func (l *Limiter) getOrCreateLimiter(clientID string) *gorate.Limiter {
    l.mu.Lock()
    defer l.mu.Unlock()

    if limiter, ok := l.buckets[clientID]; ok {
        return limiter
    }

    l.numClients++
    numClients := l.numClients
    perClientRate := l.globalRate / float64(numClients)
    bucketCapacity := l.perClientBucketCapacity(numClients)

    // create limiter for this client
    newLimiter := gorate.NewLimiter(gorate.Limit(perClientRate), bucketCapacity)
    l.buckets[clientID] = newLimiter

    // rebalance all limiters (including the new one)
    for _, limiter := range l.buckets {
        limiter.SetLimit(gorate.Limit(perClientRate))
        limiter.SetBurst(bucketCapacity)
    }

    // metrics + log inside the same critical section
    ActiveClients.
        WithLabelValues(l.workloadName).
        Set(float64(numClients))
    PerClientRate.
        WithLabelValues(l.workloadName).
        Set(perClientRate)
    PerClientBucketCapacity.
        WithLabelValues(l.workloadName).
        Set(float64(bucketCapacity))

    log.Infof("New client %s registered for %s rate limiting (clients: %d, rate: %.2f req/s, max bucket capacity: %d)",
        clientID, l.workloadName, numClients, perClientRate, bucketCapacity)

    return newLimiter
}
```

### 3. Mirror the pattern in `OnClientDisconnect`

You can mirror the above pattern for disconnects, reusing `numClients` and avoiding a separate `countActiveClients` pass:

```go
func (l *Limiter) OnClientDisconnect(clientID string) {
    if l.globalRate <= 0 {
        return
    }

    l.mu.Lock()
    defer l.mu.Unlock()

    if _, ok := l.buckets[clientID]; !ok {
        return
    }

    delete(l.buckets, clientID)
    l.numClients--
    numClients := l.numClients

    if numClients == 0 {
        ActiveClients.WithLabelValues(l.workloadName).Set(0)
        PerClientRate.WithLabelValues(l.workloadName).Set(0)
        PerClientBucketCapacity.WithLabelValues(l.workloadName).Set(0)

        log.Infof("Client %s disconnected from %s rate limiting (remaining clients: 0)",
            clientID, l.workloadName)
        return
    }

    perClientRate := l.globalRate / float64(numClients)
    bucketCapacity := l.perClientBucketCapacity(numClients)

    for _, limiter := range l.buckets {
        limiter.SetLimit(gorate.Limit(perClientRate))
        limiter.SetBurst(bucketCapacity)
    }

    ActiveClients.WithLabelValues(l.workloadName).Set(float64(numClients))
    PerClientRate.WithLabelValues(l.workloadName).Set(perClientRate)
    PerClientBucketCapacity.WithLabelValues(l.workloadName).Set(float64(bucketCapacity))

    log.Infof("Client %s disconnected from %s rate limiting (remaining clients: %d)",
        clientID, l.workloadName, numClients)
}
```

This keeps:

- per-client fair-share semantics,
- dynamic rebalancing on connect/disconnect,
- injectable clock,
- integrated metrics,

but removes:

- double full-map scans for every connect/disconnect,
- `sync.Map` + `countActiveClients` concurrency complexity, in favor of a straightforward `map+mutex` pattern.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Jan 12, 2026

Hey @vikin91, I've posted a new review for you!

@vikin91
Copy link
Contributor Author

vikin91 commented Jan 13, 2026

@sourcery-ai check again

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Jan 13, 2026

Sure! I'm generating a new review now.

@vikin91 vikin91 force-pushed the piotr/ROX-32316-central-rate-limiter branch from c3b7da9 to b2bcd0d Compare January 20, 2026 16:08
Co-authored-by: Mladen Todorovic <mtodor@gmail.com>
@openshift-ci
Copy link

openshift-ci bot commented Jan 21, 2026

@vikin91: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
ci/prow/gke-qa-e2e-tests 149a807 link false /test gke-qa-e2e-tests

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

@vikin91 vikin91 merged commit dfac06f into master Jan 21, 2026
98 of 99 checks passed
@vikin91 vikin91 deleted the piotr/ROX-32316-central-rate-limiter branch January 21, 2026 15:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants