Conversation
|
This change is part of the following stack: Change managed by git-spice. |
|
Skipping CI for Draft Pull Request. |
There was a problem hiding this comment.
Hey there - I've reviewed your changes and found some issues that need to be addressed.
- In
NewUnconfirmedMessageHandler, the cleanup goroutine closesh.ch/h.ackChonctx.Done, buttime.AfterFunccallbacks inonTimerFiredcan still run and attempt to send onh.ch, which will panic if the close wins the race; consider not closing these channels or guarding sends with an additional stopper/signal instead ofctxalone. - The VM relay’s
staleAckThresholdconfig andcachedReport.lastAckedAtfield are currently never used, while the env var comment describes behavior that depends on them; either wire this threshold into the rate limiting/drop logic or remove the unused field/parameter to avoid confusion and dead code.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `NewUnconfirmedMessageHandler`, the cleanup goroutine closes `h.ch`/`h.ackCh` on `ctx.Done`, but `time.AfterFunc` callbacks in `onTimerFired` can still run and attempt to send on `h.ch`, which will panic if the close wins the race; consider not closing these channels or guarding sends with an additional stopper/signal instead of `ctx` alone.
- The VM relay’s `staleAckThreshold` config and `cachedReport.lastAckedAt` field are currently never used, while the env var comment describes behavior that depends on them; either wire this threshold into the rate limiting/drop logic or remove the unused field/parameter to avoid confusion and dead code.
## Individual Comments
### Comment 1
<location> `pkg/retry/handler/unconfirmed_message_handler.go:57-66` </location>
<code_context>
+ defer cancel()
+
+ errChan := make(chan error, 1)
+ go func() {
+ errChan <- relay.Run(ctx)
+ }()
</code_context>
<issue_to_address>
**issue (bug_risk):** Sending on closed channels is possible after context cancellation, which can panic.
`HandleACK` can still do a non-blocking `select { case h.ackCh <- resourceID: default: }` after cancellation, and `onTimerFired` can race with the cleanup goroutine when sending on `h.ch`, even though it checks `ctx.Err()`. Both cases can end up sending on a closed channel.
To avoid this, either:
- Don’t close `h.ch`/`h.ackCh` and let consumers exit on `ctx`, or
- Encapsulate send logic so you can reliably guard against closed channels, or
- Use a separate `stop`/done channel instead of closing the data channels.
Given the lifecycle here, relying on context cancellation and leaving the data channels open is likely the safest option.
</issue_to_address>
### Comment 2
<location> `compliance/virtualmachines/relay/relay.go:100` </location>
<code_context>
+ }
+ r.markAcked(vsockID)
+
+ case vsockID := <-r.umh.RetryCommand():
+ // Relay no longer stores reports; rely on agent resubmission.
+ log.Debugf("Retry requested for VSOCK %s, ignoring (no cached report retained)", vsockID)
</code_context>
<issue_to_address>
**issue (bug_risk):** RetryCommand receive ignores the `ok` value, which will treat a closed channel as a retry for an empty resource ID.
In the main `select` you correctly use a two-value receive for `AckedResources()`, but for `RetryCommand()` you only read a single value:
```go
case vsockID := <-r.umh.RetryCommand():
// ...
```
If `RetryCommand()` is ever closed, this case will keep firing with `vsockID == ""`, producing bogus retries. It would be safer to mirror the ACK path and handle the `ok` value explicitly:
```go
case vsockID, ok := <-r.umh.RetryCommand():
if !ok {
log.Warn("UMH retry channel closed; stopping relay")
return ctx.Err()
}
// existing logic
```
or otherwise explicitly detect and handle the closed-channel case.
</issue_to_address>
### Comment 3
<location> `compliance/virtualmachines/relay/relay.go:50-59` </location>
<code_context>
+ staleAckThreshold time.Duration
</code_context>
<issue_to_address>
**issue (bug_risk):** The `staleAckThreshold` configuration is unused in the relay logic.
`Relay` stores `staleAckThreshold` (from `env.VMRelayStaleAckThreshold`), but nothing in the relay logic uses it when deciding to forward or drop reports. Given the env var docstring (“If rate limited and ACK is stale, the report is dropped...”), this looks like intended but missing behavior. Please either implement the stale-ACK handling (likely using `cache.updatedAt` / `lastAckedAt` when rate limiting applies), or remove the unused field and env var so the config isn’t a no-op.
</issue_to_address>
### Comment 4
<location> `central/sensor/service/pipeline/nodeinventory/pipeline.go:154` </location>
<code_context>
- }
-}
+ // Always send SensorACK (new path).
+ _ = injector.InjectMessage(ctx, ¢ral.MsgToSensor{
+ Msg: ¢ral.MsgToSensor_SensorAck{
+ SensorAck: ¢ral.SensorACK{
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Errors from InjectMessage are now ignored for both SensorACK and legacy NodeInventoryACK paths.
The previous `sendComplianceAck` logged a warning on `InjectMessage` failures for legacy NodeInventoryACK. The new code now sends both SensorACK and legacy ACK but discards both errors via `_ = injector.InjectMessage(...)`.
Silently swallowing these errors makes diagnosing connectivity/permission issues between Central and Sensor harder. Please at least log a warning if either injection fails—for example, collect both errors and emit a single warning if any is non-nil, even if you don’t return the error upstream.
Suggested implementation:
```golang
// Always send SensorACK (new path).
if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{
Msg: ¢ral.MsgToSensor_SensorAck{
SensorAck: ¢ral.SensorACK{
Action: convertLegacyActionToSensor(t),
MessageType: central.SensorACK_NODE_INVENTORY,
ResourceId: nodeName,
},
},
}); err != nil {
log.Warnf("failed to inject SensorACK for node inventory reply (clusterID=%s, nodeName=%s): %v", clusterID, nodeName, err)
}
```
```golang
// Always send legacy NodeInventoryACK for backward compatibility.
// TODO(ROX-): remove once all sensors support SensorACK for node inventory.
if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{
```
To fully implement the suggested behavior for the legacy NodeInventoryACK path, you should:
1. Wrap the full legacy `InjectMessage` call in an `if err := ...; err != nil { ... }` block, similar to the SensorACK path. For example, once you can see the entire block, change:
```go
// Always send legacy NodeInventoryACK for backward compatibility.
if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{
Msg: ¢ral.MsgToSensor_NodeInventoryAck{
NodeInventoryAck: ¢ral.NodeInventoryACK{
// existing fields...
},
},
}); err != nil {
log.Warnf("failed to inject legacy NodeInventoryACK for node inventory reply (clusterID=%s, nodeName=%s): %v", clusterID, nodeName, err)
}
```
2. Ensure that `log` is a valid logger in this file (most StackRox code uses `var log = logging.LoggerForModule()` at the top). If this file uses a different logger variable or logging package, adjust the `log.Warnf` calls accordingly.
This will ensure that both SensorACK and legacy NodeInventoryACK injection failures are logged, making connectivity/permission issues between Central and Sensor diagnosable without changing the function’s return behavior.
</issue_to_address>
### Comment 5
<location> `central/sensor/service/pipeline/nodeindex/pipeline.go:134` </location>
<code_context>
- }
-}
+ // Always send SensorACK (new path).
+ _ = injector.InjectMessage(ctx, ¢ral.MsgToSensor{
+ Msg: ¢ral.MsgToSensor_SensorAck{
+ SensorAck: ¢ral.SensorACK{
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Similar to node inventory, NodeIndex ACK injection errors are ignored, which can hide delivery problems.
This change now drops any `InjectMessage` errors, whereas previously ACK send failures were logged. Even if the pipeline shouldn’t fail on these errors, they should still be logged (with the node identifier) when either the SensorACK or legacy NodeInventoryACK injection fails, to preserve operational visibility into delivery issues.
Suggested implementation:
```golang
// Always send SensorACK (new path).
if err := injector.InjectMessage(ctx, ¢ral.MsgToSensor{
Msg: ¢ral.MsgToSensor_SensorAck{
SensorAck: ¢ral.SensorACK{
Action: central.SensorACK_ACK,
MessageType: central.SensorACK_NODE_INDEX_REPORT,
ResourceId: node.GetName(),
},
},
}); err != nil {
// Do not fail the pipeline on ACK delivery problems, but preserve visibility.
log.Errorf("failed to inject SensorACK for node index report (node=%s): %v", node.GetName(), err)
}
// Always send legacy NodeInventoryACK for backward compatibility.
```
1. Apply the same pattern to the legacy `NodeInventoryACK` injection below this snippet:
- Capture the error returned by `injector.InjectMessage`.
- Log it (with node identifier) instead of ignoring it, without changing control flow.
2. Ensure the logger (`log.Errorf` or equivalent) matches existing logging conventions in this package (e.g., you may need to use a package-specific logger or `log.Warnf` if that is preferred).
3. If `log` is not already imported/available, add the appropriate import or use the locally established logging helper consistent with the rest of `pipeline.go`.
</issue_to_address>
### Comment 6
<location> `central/sensor/service/pipeline/virtualmachineindex/pipeline_test.go:439-448` </location>
<code_context>
+func TestSendVMIndexReportResponse_SensorACKAndLegacyFallback(t *testing.T) {
</code_context>
<issue_to_address>
**suggestion (testing):** Consider extending coverage to NACK responses and reason propagation in VM index ACK tests
This test covers the ACK path with SensorACK capability and the no-capability case. To fully validate the new behavior, please also add: (1) a case where `action` is `central.SensorACK_NACK` that asserts the SensorACK `action` and `reason`, and (2) a case where a NACK is sent but `HasCapability` returns false, confirming no SensorACK is produced. This will cover the negative path and reason-handling logic for VM index responses.
Suggested implementation:
```golang
err := pipeline.Run(ctx, testClusterID, msg, nil)
assert.NoError(t, err, "rate limited request should not return error (NACK sent instead)")
}
func TestSendVMIndexReportResponse_SensorACKNACK(t *testing.T) {
t.Setenv(features.VirtualMachines.EnvVar(), "true")
ctx := context.Background()
// Capability path: NACK should produce SensorACK with reason
ackInjector := &vmRecordingInjector{hasAckCap: true}
sendVMIndexReportResponse(ctx, "vm-nack-1", central.SensorACK_NACK, "validation failed", ackInjector)
require.Len(t, ackInjector.sensorAcks, 1)
protoassert.Equal(t, ¢ral.SensorACK{
Action: central.SensorACK_NACK,
MessageType: central.SensorACK_VM_INDEX_REPORT,
ResourceId: "vm-nack-1",
Reason: "validation failed",
}, ackInjector.sensorAcks[0])
}
func TestSendVMIndexReportResponse_SensorACKNACK_LegacyFallback(t *testing.T) {
t.Setenv(features.VirtualMachines.EnvVar(), "true")
ctx := context.Background()
// Legacy path (no capability): NACK should not produce a SensorACK
ackInjector := &vmRecordingInjector{hasAckCap: false}
sendVMIndexReportResponse(ctx, "vm-nack-2", central.SensorACK_NACK, "legacy path", ackInjector)
require.Len(t, ackInjector.sensorAcks, 0)
}
func TestSendVMIndexReportResponse_SensorACKAndLegacyFallback(t *testing.T) {
```
1. These tests assume that `context` is already imported (likely true since other tests use `ctx`). If it is not, add `import "context"` to the import block.
2. The tests also assume that `vmRecordingInjector` records `SensorACK` messages in a `sensorAcks` slice and that `sendVMIndexReportResponse` respects `hasAckCap` for emitting SensorACKs. If field names are different, adjust `vmRecordingInjector{hasAckCap: ...}` and `ackInjector.sensorAcks` accordingly.
</issue_to_address>
### Comment 7
<location> `pkg/retry/handler/unconfirmed_message_handler_test.go:123` </location>
<code_context>
}
}
+
+func (suite *UnconfirmedMessageHandlerTestSuite) TestMultipleResources() {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for AckedResources channel behavior and shutdown semantics of the new UMH
`TestMultipleResources` covers per-resource retry behavior well. To fully exercise the new API, please add tests that (1) verify ACKs cause the correct resource IDs to be emitted on `AckedResources()` and remain non-blocking when the channel is full, and (2) verify that after the context is canceled, both `RetryCommand()` and `AckedResources()` channels are closed and no further retries are emitted. This will validate the new observer channel and its shutdown semantics so callers can safely range over these channels.
Suggested implementation:
```golang
func (suite *UnconfirmedMessageHandlerTestSuite) TestMultipleResources() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
baseInterval := time.Second
umh := NewUnconfirmedMessageHandler(ctx, "test", baseInterval)
// Send for two different resources
umh.ObserveSending("resource-1")
umh.ObserveSending("resource-2")
// ACK only resource-1 and verify it is emitted on AckedResources().
umh.HandleACK("resource-1")
select {
case resourceID := <-umh.AckedResources():
if resourceID != "resource-1" {
suite.T().Fatalf("expected acked resource %q, got %q", "resource-1", resourceID)
}
case <-time.After(2 * time.Second):
suite.T().Fatal("timed out waiting for acked resource on AckedResources()")
}
// Verify HandleACK remains non-blocking even when many ACKs are sent (simulating a full channel).
nonBlockingDone := make(chan struct{})
go func() {
defer close(nonBlockingDone)
for i := 0; i < 100; i++ {
umh.HandleACK(fmt.Sprintf("resource-%d", i))
}
}()
select {
case <-nonBlockingDone:
// OK: HandleACK did not block even under load.
case <-time.After(500 * time.Millisecond):
suite.T().Fatal("HandleACK blocked when AckedResources channel should remain non-blocking")
}
// Verify shutdown semantics:
// After context cancellation, callers can safely range over RetryCommand() and AckedResources()
// and no further retries are emitted.
retryCh := umh.RetryCommand()
ackedCh := umh.AckedResources()
// Ensure there is at least one in-flight retry for resource-2 before we cancel, so that
// the retry machinery is active.
umh.HandleNACK("resource-2")
select {
case <-retryCh:
// A retry for resource-2 has been emitted; the retry loop is active.
case <-time.After(3 * baseInterval):
suite.T().Fatal("timed out waiting for retry before canceling context")
}
// Now cancel the context and verify both observer channels eventually close.
cancel()
channelsClosed := make(chan struct{})
go func() {
defer close(channelsClosed)
// Ranging over these channels must eventually complete once the handler shuts down.
for range retryCh {
}
for range ackedCh {
}
}()
select {
case <-channelsClosed:
// OK: both channels have been closed and are safe to range over.
case <-time.After(2 * time.Second):
suite.T().Fatal("timed out waiting for RetryCommand and AckedResources channels to close after cancel")
}
```
1. The updated test uses `fmt.Sprintf`, so ensure `fmt` is imported at the top of `unconfirmed_message_handler_test.go` if it is not already:
```go
import (
"context"
"fmt"
"time"
// other imports...
)
```
2. If your project uses `require`/`assert` from `testify` in this test file, you may prefer to replace the `suite.T().Fatal` / `Fatalf` calls with `require.FailNow` / `require.Equal` for consistency.
3. This test assumes `AckedResources()` and `RetryCommand()` return the same underlying channels on each call (read-only or bidirectional). If the API differs (e.g., they return new channels or typed commands), adjust the type assertions and equality checks accordingly.
</issue_to_address>
### Comment 8
<location> `compliance/virtualmachines/relay/relay_test.go:215` </location>
<code_context>
}
+// TestRelay_RateLimiting verifies that rate limiting works
+func (s *relayTestSuite) TestRelay_RateLimiting() {
+ // Create mock sender
+ done := concurrency.NewSignal()
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding tests for UMH interaction in the relay, not just rate limiting
Since the relay now wires in an `UnconfirmedMessageHandler`, it would be valuable to add a test using a `mockUMH` that records `ObserveSending` calls and emits ACKs via `AckedResources()`. That test could assert that `ObserveSending` is called with the correct VSOCK ID when a report is forwarded, that ACKs trigger `markAcked` updates without disrupting normal processing, and that `RetryCommand()` messages are ignored as expected.
</issue_to_address>
### Comment 9
<location> `pkg/retry/handler/unconfirmed_message_handler.go:197` </location>
<code_context>
+ }
+}
+
+// calculateNextInterval returns the next retry interval with exponential backoff.
+func (h *UnconfirmedMessageHandlerImpl) calculateNextInterval(retry int32) time.Duration {
+ nextSeconds := (retry + 1) * int32(h.baseInterval.Seconds())
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the retry backoff math and unifying the locking style in `onTimerFired` to make the handler’s behavior easier to reason about.
You can simplify a couple of things without changing behavior:
### 1. Remove `fmt.Sprintf` + `time.ParseDuration` in `calculateNextInterval`
You already work with `time.Duration`, so you can avoid string round-trips and make the backoff clearer:
```go
// calculateNextInterval returns the next retry interval with exponential backoff.
func (h *UnconfirmedMessageHandlerImpl) calculateNextInterval(retry int32) time.Duration {
if h.baseInterval <= 0 {
return defaultBaseInterval
}
next := time.Duration(retry+1) * h.baseInterval
if next <= 0 { // overflow guard
return defaultBaseInterval
}
return next
}
```
This keeps the same semantics (“(retry+1) * baseInterval”, with a sane fallback) but is easier to reason about and removes the error path.
### 2. Use a single locking style in `onTimerFired`
Right now `onTimerFired` is the only place using `concurrency.WithLock`, while the rest of the type uses explicit `mu.Lock`/`Unlock`. Using one style everywhere makes invariants easier to track, especially with timers firing concurrently.
You can inline the lock and keep the rest of the logic as-is:
```go
func (h *UnconfirmedMessageHandlerImpl) onTimerFired(resourceID string) {
if h.ctx.Err() != nil {
return
}
shouldSignal := false
h.mu.Lock()
state, exists := h.resources[resourceID]
if exists && state.numUnackedSendings > 0 {
state.retry++
nextInterval := h.calculateNextInterval(state.retry)
log.Infof("[%s] Resource %s has %d unacked messages, suggesting retry %d (next in %s)",
h.handlerName, resourceID, state.numUnackedSendings, state.retry, nextInterval)
h.resetTimer(resourceID, state, nextInterval)
shouldSignal = true
}
h.mu.Unlock()
if !shouldSignal {
return
}
select {
case h.ch <- resourceID:
default:
log.Warnf("[%s] Retry channel full, dropping retry signal for %s", h.handlerName, resourceID)
}
}
```
This preserves the behavior (including non-blocking send and timer reset) but removes the mixed locking patterns, which should make future changes safer and easier to reason about.
</issue_to_address>
### Comment 10
<location> `compliance/virtualmachines/relay/relay.go:26` </location>
<code_context>
Start(ctx context.Context) (<-chan *v1.IndexReport, error)
}
+// cachedReport holds metadata about the last update/ACK timestamps for a VSOCK ID.
+type cachedReport struct {
+ updatedAt time.Time
</code_context>
<issue_to_address>
**issue (complexity):** Consider removing the unused cache, stale-ACK fields, and ACK channel handling until they influence behavior to keep the relay’s logic smaller and easier to understand.
`cachedReport`, `staleAckThreshold`, and the ACK channel wiring add state and locking without affecting behavior. You can drop them for now and reintroduce when the stale-ACK logic is actually used. This keeps rate limiting + UMH retry integration while reducing cognitive load.
**1. Remove unused cache-related state and parameter**
```go
// Before
type Relay struct {
reportStream IndexReportStream
reportSender sender.IndexReportSender
umh UnconfirmedMessageHandler
// Rate limiting config
maxReportsPerMinute float64
staleAckThreshold time.Duration
// cache stores metadata for each VSOCK ID.
cache map[string]*cachedReport
// limiters stores per-VSOCK rate limiters (leaky bucket, no burst).
limiters map[string]*rate.Limiter
// mu guards cache and limiters.
mu sync.Mutex
}
```
```go
// After
type Relay struct {
reportStream IndexReportStream
reportSender sender.IndexReportSender
umh UnconfirmedMessageHandler
maxReportsPerMinute float64
limiters map[string]*rate.Limiter
mu sync.Mutex
}
```
```go
// Before
func New(
reportStream IndexReportStream,
reportSender sender.IndexReportSender,
umh UnconfirmedMessageHandler,
maxReportsPerMinute float64,
staleAckThreshold time.Duration,
) *Relay {
return &Relay{
reportStream: reportStream,
reportSender: reportSender,
umh: umh,
maxReportsPerMinute: maxReportsPerMinute,
staleAckThreshold: staleAckThreshold,
cache: make(map[string]*cachedReport),
limiters: make(map[string]*rate.Limiter),
}
}
```
```go
// After
func New(
reportStream IndexReportStream,
reportSender sender.IndexReportSender,
umh UnconfirmedMessageHandler,
maxReportsPerMinute float64,
) *Relay {
return &Relay{
reportStream: reportStream,
reportSender: reportSender,
umh: umh,
maxReportsPerMinute: maxReportsPerMinute,
limiters: make(map[string]*rate.Limiter),
}
}
```
**2. Drop `cachedReport`, `cacheReport`, `markAcked`, and `AckedResources` until they’re used**
```go
// Before
type UnconfirmedMessageHandler interface {
HandleACK(resourceID string)
HandleNACK(resourceID string)
ObserveSending(resourceID string)
RetryCommand() <-chan string
AckedResources() <-chan string
}
```
```go
// After
type UnconfirmedMessageHandler interface {
HandleACK(resourceID string)
HandleNACK(resourceID string)
ObserveSending(resourceID string)
RetryCommand() <-chan string
// AckedResources can be added back once ACK metadata affects behavior.
}
```
```go
// Before (Run)
for {
select {
case <-ctx.Done():
return ctx.Err()
case vsockID, ok := <-r.umh.AckedResources():
if !ok {
log.Warn("UMH ack channel closed; stopping relay")
return ctx.Err()
}
r.markAcked(vsockID)
case vsockID := <-r.umh.RetryCommand():
log.Debugf("Retry requested for VSOCK %s, ignoring (no cached report retained)", vsockID)
case report := <-reportChan:
...
r.handleIncomingReport(ctx, report)
}
}
```
```go
// After (Run)
for {
select {
case <-ctx.Done():
return ctx.Err()
case vsockID := <-r.umh.RetryCommand():
log.Debugf("Retry requested for VSOCK %s, ignoring (no cached report retained)", vsockID)
case report := <-reportChan:
if report == nil {
log.Warn("Received nil report, skipping")
continue
}
r.handleIncomingReport(ctx, report)
}
}
```
```go
// Before (handleIncomingReport)
func (r *Relay) handleIncomingReport(ctx context.Context, report *v1.IndexReport) {
vsockID := report.GetVsockCid()
// Always cache metadata for the latest report
r.cacheReport(report)
// Check rate limit
if !r.tryConsume(vsockID) { ... }
r.umh.ObserveSending(vsockID)
if err := r.reportSender.Send(ctx, report); err != nil { ... }
}
```
```go
// After (handleIncomingReport)
func (r *Relay) handleIncomingReport(ctx context.Context, report *v1.IndexReport) {
vsockID := report.GetVsockCid()
if !r.tryConsume(vsockID) {
log.Debugf("Rate limited for VSOCK %s; dropping report and relying on agent retry", vsockID)
return
}
r.umh.ObserveSending(vsockID)
if err := r.reportSender.Send(ctx, report); err != nil {
log.Errorf("Failed to send report (vsock CID: %s): %v", vsockID, err)
}
}
```
By pruning the unused cache and ACK channel path, `Run` focuses on the behaviors that actually matter today (report processing, rate limiting, and UMH retry commands), and you avoid carrying around speculative state until it’s wired into real decisions.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
Images are ready for the commit at 0eba237. To use with deploy scripts, first |
|
@sourcery-ai recheck |
|
Sure! I'm generating a new review now. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## piotr/ROX-32316-sensor-new-ack #18267 +/- ##
==================================================================
- Coverage 48.89% 48.88% -0.02%
==================================================================
Files 2627 2628 +1
Lines 198377 198969 +592
==================================================================
+ Hits 96998 97257 +259
- Misses 93977 94309 +332
- Partials 7402 7403 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
c3bb1ab to
37454fc
Compare
1c2b1fa to
6721dbc
Compare
d69f9f0 to
1893f9b
Compare
37454fc to
62eb393
Compare
59d02f9 to
b9d3498
Compare
62eb393 to
a06a205
Compare
cd0f70d to
4ed566f
Compare
a06a205 to
e8c1230
Compare
4ed566f to
765b879
Compare
e8c1230 to
967f10d
Compare
765b879 to
e929947
Compare
967f10d to
69b8da0
Compare
e929947 to
77ea3b5
Compare
69b8da0 to
ee9062c
Compare
77ea3b5 to
d866337
Compare
bbc57dc to
243701d
Compare
Introduce a reusable token bucket rate limiter in pkg/rate that provides per-sensor fair rate limiting. Each sensor gets an equal share (1/N) of the global rate with automatic rebalancing on connect/disconnect. Key features: - Float64 rate support for sub-1 req/sec limits (e.g., 0.5) - Configurable via ROX_VM_INDEX_REPORT_RATE_LIMIT (default: 2.0 req/s) - Per-sensor bucket capacity via ROX_VM_INDEX_REPORT_BUCKET_CAPACITY - Prometheus metrics for accepted/rejected requests - Global registry for multiple workload types Integrated into virtualmachineindex pipeline with ACK/NACK responses sent back to Sensor when rate limit is exceeded. AI-assisted: Initial implementation and tests generated by AI. User-reviewed: Architecture decisions, naming conventions, float64 rate type, error handling (return errors vs panic), and test robustness.
This is to avoid state explosion when using many client IDs.
… via UMH Why: dual-send SensorACK + legacy ACKs for node inventory/index while keeping VM index SensorACK-only; forward VM SensorACKs to Compliance/UMH; remove relay’s cached payload retries and rely on agent resend; add compliance send metrics; simplify sender to single-attempt; tighten UMH timer/lock handling; minor env/rate tweaks. refactor(umh): replace AckedResources channel with OnACK callback Simplifies ACK handling by using callback instead of channel. Callback invoked outside lock to avoid deadlocks. AI-generated, user-corrected locking and test sync patterns.
3736e91 to
0eba237
Compare
Empty Commit
Add VM/Sensor ACK wiring, drop relay payload cache, and route retries via UMH
Why: dual-send SensorACK + legacy ACKs for node inventory/index while keeping VM index SensorACK-only; forward VM SensorACKs to Compliance/UMH; remove relay’s cached payload retries and rely on agent resend; add compliance send metrics; simplify sender to single-attempt; tighten UMH timer/lock handling; minor env/rate tweaks.
Description
change me!
User-facing documentation
Testing and quality
Automated testing
How I validated my change
change me!
Metrics
Central
Sensor