Skip to content

ROX-32316: Add VM/Sensor ACK wiring#18267

Closed
vikin91 wants to merge 25 commits intomasterfrom
piotr/ROX-32316-compliance-retry-logic
Closed

ROX-32316: Add VM/Sensor ACK wiring#18267
vikin91 wants to merge 25 commits intomasterfrom
piotr/ROX-32316-compliance-retry-logic

Conversation

@vikin91
Copy link
Contributor

@vikin91 vikin91 commented Dec 17, 2025

Empty Commit

Add VM/Sensor ACK wiring, drop relay payload cache, and route retries via UMH

Why: dual-send SensorACK + legacy ACKs for node inventory/index while keeping VM index SensorACK-only; forward VM SensorACKs to Compliance/UMH; remove relay’s cached payload retries and rely on agent resend; add compliance send metrics; simplify sender to single-attempt; tighten UMH timer/lock handling; minor env/rate tweaks.

Description

change me!

User-facing documentation

Testing and quality

  • the change is production ready: the change is GA, or otherwise the functionality is gated by a feature flag
  • CI results are inspected

Automated testing

  • added unit tests
  • added e2e tests
  • added regression tests
  • added compatibility tests
  • modified existing tests

How I validated my change

change me!

Metrics

Central

# HELP rox_central_rate_limiter_per_sensor_bucket_max_tokens Current per-sensor token bucket capacity (max tokens). Must be a positive integer. This is the maximum number of requests that can be accepted in a burst before rate limiting kicks in.
# TYPE rox_central_rate_limiter_per_sensor_bucket_max_tokens gauge
rox_central_rate_limiter_per_sensor_bucket_max_tokens{workload="vm-index-report"} 5
# HELP rox_central_rate_limiter_per_sensor_bucket_refill_rate_per_second Current per-sensor rate limit in requests per second. This is also the rate at which tokens are refilled.
# TYPE rox_central_rate_limiter_per_sensor_bucket_refill_rate_per_second gauge
rox_central_rate_limiter_per_sensor_bucket_refill_rate_per_second{workload="vm-index-report"} 0.0001
# HELP rox_central_rate_limiter_requests_accepted_total Requests accepted by the rate limiter
# TYPE rox_central_rate_limiter_requests_accepted_total counter
rox_central_rate_limiter_requests_accepted_total{sensor_id="fa85ad6d-f729-4c5b-91b6-47021ea91a34",workload="vm-index-report"} 5
# HELP rox_central_rate_limiter_requests_rejected_total Requests rejected by the rate limiter
# TYPE rox_central_rate_limiter_requests_rejected_total counter
rox_central_rate_limiter_requests_rejected_total{reason="rate limit exceeded",sensor_id="fa85ad6d-f729-4c5b-91b6-47021ea91a34",workload="vm-index-report"} 11
# HELP rox_central_rate_limiter_requests_total Total requests received by the rate limiter
# TYPE rox_central_rate_limiter_requests_total counter
rox_central_rate_limiter_requests_total{outcome="accepted",workload="vm-index-report"} 5
rox_central_rate_limiter_requests_total{outcome="rejected",workload="vm-index-report"} 11

Sensor

# HELP rox_sensor_virtual_machine_index_report_acks_received_total Total number of ACK/NACK responses received from Central for VM index reports
# TYPE rox_sensor_virtual_machine_index_report_acks_received_total counter
rox_sensor_virtual_machine_index_report_acks_received_total{action="ACK"} 12
rox_sensor_virtual_machine_index_report_acks_received_total{action="NACK"} 11

@vikin91
Copy link
Contributor Author

vikin91 commented Dec 17, 2025

This change is part of the following stack:

Change managed by git-spice.

@openshift-ci
Copy link

openshift-ci bot commented Dec 17, 2025

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and found some issues that need to be addressed.

  • In NewUnconfirmedMessageHandler, the cleanup goroutine closes h.ch/h.ackCh on ctx.Done, but time.AfterFunc callbacks in onTimerFired can still run and attempt to send on h.ch, which will panic if the close wins the race; consider not closing these channels or guarding sends with an additional stopper/signal instead of ctx alone.
  • The VM relay’s staleAckThreshold config and cachedReport.lastAckedAt field are currently never used, while the env var comment describes behavior that depends on them; either wire this threshold into the rate limiting/drop logic or remove the unused field/parameter to avoid confusion and dead code.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `NewUnconfirmedMessageHandler`, the cleanup goroutine closes `h.ch`/`h.ackCh` on `ctx.Done`, but `time.AfterFunc` callbacks in `onTimerFired` can still run and attempt to send on `h.ch`, which will panic if the close wins the race; consider not closing these channels or guarding sends with an additional stopper/signal instead of `ctx` alone.
- The VM relay’s `staleAckThreshold` config and `cachedReport.lastAckedAt` field are currently never used, while the env var comment describes behavior that depends on them; either wire this threshold into the rate limiting/drop logic or remove the unused field/parameter to avoid confusion and dead code.

## Individual Comments

### Comment 1
<location> `pkg/retry/handler/unconfirmed_message_handler.go:57-66` </location>
<code_context>
+	defer cancel()
+
+	errChan := make(chan error, 1)
+	go func() {
+		errChan <- relay.Run(ctx)
+	}()
</code_context>

<issue_to_address>
**issue (bug_risk):** Sending on closed channels is possible after context cancellation, which can panic.

`HandleACK` can still do a non-blocking `select { case h.ackCh <- resourceID: default: }` after cancellation, and `onTimerFired` can race with the cleanup goroutine when sending on `h.ch`, even though it checks `ctx.Err()`. Both cases can end up sending on a closed channel.

To avoid this, either:
- Don’t close `h.ch`/`h.ackCh` and let consumers exit on `ctx`, or
- Encapsulate send logic so you can reliably guard against closed channels, or
- Use a separate `stop`/done channel instead of closing the data channels.

Given the lifecycle here, relying on context cancellation and leaving the data channels open is likely the safest option.
</issue_to_address>

### Comment 2
<location> `compliance/virtualmachines/relay/relay.go:100` </location>
<code_context>
+			}
+			r.markAcked(vsockID)
+
+		case vsockID := <-r.umh.RetryCommand():
+			// Relay no longer stores reports; rely on agent resubmission.
+			log.Debugf("Retry requested for VSOCK %s, ignoring (no cached report retained)", vsockID)
</code_context>

<issue_to_address>
**issue (bug_risk):** RetryCommand receive ignores the `ok` value, which will treat a closed channel as a retry for an empty resource ID.

In the main `select` you correctly use a two-value receive for `AckedResources()`, but for `RetryCommand()` you only read a single value:

```go
case vsockID := <-r.umh.RetryCommand():
    // ...
```

If `RetryCommand()` is ever closed, this case will keep firing with `vsockID == ""`, producing bogus retries. It would be safer to mirror the ACK path and handle the `ok` value explicitly:

```go
case vsockID, ok := <-r.umh.RetryCommand():
    if !ok {
        log.Warn("UMH retry channel closed; stopping relay")
        return ctx.Err()
    }
    // existing logic
```

or otherwise explicitly detect and handle the closed-channel case.
</issue_to_address>

### Comment 3
<location> `compliance/virtualmachines/relay/relay.go:50-59` </location>
<code_context>
+	staleAckThreshold   time.Duration
</code_context>

<issue_to_address>
**issue (bug_risk):** The `staleAckThreshold` configuration is unused in the relay logic.

`Relay` stores `staleAckThreshold` (from `env.VMRelayStaleAckThreshold`), but nothing in the relay logic uses it when deciding to forward or drop reports. Given the env var docstring (“If rate limited and ACK is stale, the report is dropped...”), this looks like intended but missing behavior. Please either implement the stale-ACK handling (likely using `cache.updatedAt` / `lastAckedAt` when rate limiting applies), or remove the unused field and env var so the config isn’t a no-op.
</issue_to_address>

### Comment 4
<location> `central/sensor/service/pipeline/nodeinventory/pipeline.go:154` </location>
<code_context>
-	}
-}
+	// Always send SensorACK (new path).
+	_ = injector.InjectMessage(ctx, &central.MsgToSensor{
+		Msg: &central.MsgToSensor_SensorAck{
+			SensorAck: &central.SensorACK{
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Errors from InjectMessage are now ignored for both SensorACK and legacy NodeInventoryACK paths.

The previous `sendComplianceAck` logged a warning on `InjectMessage` failures for legacy NodeInventoryACK. The new code now sends both SensorACK and legacy ACK but discards both errors via `_ = injector.InjectMessage(...)`.

Silently swallowing these errors makes diagnosing connectivity/permission issues between Central and Sensor harder. Please at least log a warning if either injection fails—for example, collect both errors and emit a single warning if any is non-nil, even if you don’t return the error upstream.

Suggested implementation:

```golang
	// Always send SensorACK (new path).
	if err := injector.InjectMessage(ctx, &central.MsgToSensor{
		Msg: &central.MsgToSensor_SensorAck{
			SensorAck: &central.SensorACK{
				Action:      convertLegacyActionToSensor(t),
				MessageType: central.SensorACK_NODE_INVENTORY,
				ResourceId:  nodeName,
			},
		},
	}); err != nil {
		log.Warnf("failed to inject SensorACK for node inventory reply (clusterID=%s, nodeName=%s): %v", clusterID, nodeName, err)
	}

```

```golang
	// Always send legacy NodeInventoryACK for backward compatibility.
	// TODO(ROX-): remove once all sensors support SensorACK for node inventory.
	if err := injector.InjectMessage(ctx, &central.MsgToSensor{

```

To fully implement the suggested behavior for the legacy NodeInventoryACK path, you should:

1. Wrap the full legacy `InjectMessage` call in an `if err := ...; err != nil { ... }` block, similar to the SensorACK path. For example, once you can see the entire block, change:

```go
// Always send legacy NodeInventoryACK for backward compatibility.
if err := injector.InjectMessage(ctx, &central.MsgToSensor{
    Msg: &central.MsgToSensor_NodeInventoryAck{
        NodeInventoryAck: &central.NodeInventoryACK{
            // existing fields...
        },
    },
}); err != nil {
    log.Warnf("failed to inject legacy NodeInventoryACK for node inventory reply (clusterID=%s, nodeName=%s): %v", clusterID, nodeName, err)
}
```

2. Ensure that `log` is a valid logger in this file (most StackRox code uses `var log = logging.LoggerForModule()` at the top). If this file uses a different logger variable or logging package, adjust the `log.Warnf` calls accordingly.

This will ensure that both SensorACK and legacy NodeInventoryACK injection failures are logged, making connectivity/permission issues between Central and Sensor diagnosable without changing the function’s return behavior.
</issue_to_address>

### Comment 5
<location> `central/sensor/service/pipeline/nodeindex/pipeline.go:134` </location>
<code_context>
-	}
-}
+	// Always send SensorACK (new path).
+	_ = injector.InjectMessage(ctx, &central.MsgToSensor{
+		Msg: &central.MsgToSensor_SensorAck{
+			SensorAck: &central.SensorACK{
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Similar to node inventory, NodeIndex ACK injection errors are ignored, which can hide delivery problems.

This change now drops any `InjectMessage` errors, whereas previously ACK send failures were logged. Even if the pipeline shouldn’t fail on these errors, they should still be logged (with the node identifier) when either the SensorACK or legacy NodeInventoryACK injection fails, to preserve operational visibility into delivery issues.

Suggested implementation:

```golang
	// Always send SensorACK (new path).
	if err := injector.InjectMessage(ctx, &central.MsgToSensor{
		Msg: &central.MsgToSensor_SensorAck{
			SensorAck: &central.SensorACK{
				Action:      central.SensorACK_ACK,
				MessageType: central.SensorACK_NODE_INDEX_REPORT,
				ResourceId:  node.GetName(),
			},
		},
	}); err != nil {
		// Do not fail the pipeline on ACK delivery problems, but preserve visibility.
		log.Errorf("failed to inject SensorACK for node index report (node=%s): %v", node.GetName(), err)
	}

	// Always send legacy NodeInventoryACK for backward compatibility.

```

1. Apply the same pattern to the legacy `NodeInventoryACK` injection below this snippet:
   - Capture the error returned by `injector.InjectMessage`.
   - Log it (with node identifier) instead of ignoring it, without changing control flow.
2. Ensure the logger (`log.Errorf` or equivalent) matches existing logging conventions in this package (e.g., you may need to use a package-specific logger or `log.Warnf` if that is preferred).
3. If `log` is not already imported/available, add the appropriate import or use the locally established logging helper consistent with the rest of `pipeline.go`.
</issue_to_address>

### Comment 6
<location> `central/sensor/service/pipeline/virtualmachineindex/pipeline_test.go:439-448` </location>
<code_context>
+func TestSendVMIndexReportResponse_SensorACKAndLegacyFallback(t *testing.T) {
</code_context>

<issue_to_address>
**suggestion (testing):** Consider extending coverage to NACK responses and reason propagation in VM index ACK tests

This test covers the ACK path with SensorACK capability and the no-capability case. To fully validate the new behavior, please also add: (1) a case where `action` is `central.SensorACK_NACK` that asserts the SensorACK `action` and `reason`, and (2) a case where a NACK is sent but `HasCapability` returns false, confirming no SensorACK is produced. This will cover the negative path and reason-handling logic for VM index responses.

Suggested implementation:

```golang
	err := pipeline.Run(ctx, testClusterID, msg, nil)
	assert.NoError(t, err, "rate limited request should not return error (NACK sent instead)")
}

func TestSendVMIndexReportResponse_SensorACKNACK(t *testing.T) {
	t.Setenv(features.VirtualMachines.EnvVar(), "true")
	ctx := context.Background()

	// Capability path: NACK should produce SensorACK with reason
	ackInjector := &vmRecordingInjector{hasAckCap: true}
	sendVMIndexReportResponse(ctx, "vm-nack-1", central.SensorACK_NACK, "validation failed", ackInjector)

	require.Len(t, ackInjector.sensorAcks, 1)
	protoassert.Equal(t, &central.SensorACK{
		Action:      central.SensorACK_NACK,
		MessageType: central.SensorACK_VM_INDEX_REPORT,
		ResourceId:  "vm-nack-1",
		Reason:      "validation failed",
	}, ackInjector.sensorAcks[0])
}

func TestSendVMIndexReportResponse_SensorACKNACK_LegacyFallback(t *testing.T) {
	t.Setenv(features.VirtualMachines.EnvVar(), "true")
	ctx := context.Background()

	// Legacy path (no capability): NACK should not produce a SensorACK
	ackInjector := &vmRecordingInjector{hasAckCap: false}
	sendVMIndexReportResponse(ctx, "vm-nack-2", central.SensorACK_NACK, "legacy path", ackInjector)

	require.Len(t, ackInjector.sensorAcks, 0)
}

func TestSendVMIndexReportResponse_SensorACKAndLegacyFallback(t *testing.T) {

```

1. These tests assume that `context` is already imported (likely true since other tests use `ctx`). If it is not, add `import "context"` to the import block.
2. The tests also assume that `vmRecordingInjector` records `SensorACK` messages in a `sensorAcks` slice and that `sendVMIndexReportResponse` respects `hasAckCap` for emitting SensorACKs. If field names are different, adjust `vmRecordingInjector{hasAckCap: ...}` and `ackInjector.sensorAcks` accordingly.
</issue_to_address>

### Comment 7
<location> `pkg/retry/handler/unconfirmed_message_handler_test.go:123` </location>
<code_context>
 	}
 }
+
+func (suite *UnconfirmedMessageHandlerTestSuite) TestMultipleResources() {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
</code_context>

<issue_to_address>
**suggestion (testing):** Add tests for AckedResources channel behavior and shutdown semantics of the new UMH

`TestMultipleResources` covers per-resource retry behavior well. To fully exercise the new API, please add tests that (1) verify ACKs cause the correct resource IDs to be emitted on `AckedResources()` and remain non-blocking when the channel is full, and (2) verify that after the context is canceled, both `RetryCommand()` and `AckedResources()` channels are closed and no further retries are emitted. This will validate the new observer channel and its shutdown semantics so callers can safely range over these channels.

Suggested implementation:

```golang
func (suite *UnconfirmedMessageHandlerTestSuite) TestMultipleResources() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	baseInterval := time.Second
	umh := NewUnconfirmedMessageHandler(ctx, "test", baseInterval)

	// Send for two different resources
	umh.ObserveSending("resource-1")
	umh.ObserveSending("resource-2")

	// ACK only resource-1 and verify it is emitted on AckedResources().
	umh.HandleACK("resource-1")

	select {
	case resourceID := <-umh.AckedResources():
		if resourceID != "resource-1" {
			suite.T().Fatalf("expected acked resource %q, got %q", "resource-1", resourceID)
		}
	case <-time.After(2 * time.Second):
		suite.T().Fatal("timed out waiting for acked resource on AckedResources()")
	}

	// Verify HandleACK remains non-blocking even when many ACKs are sent (simulating a full channel).
	nonBlockingDone := make(chan struct{})
	go func() {
		defer close(nonBlockingDone)
		for i := 0; i < 100; i++ {
			umh.HandleACK(fmt.Sprintf("resource-%d", i))
		}
	}()

	select {
	case <-nonBlockingDone:
		// OK: HandleACK did not block even under load.
	case <-time.After(500 * time.Millisecond):
		suite.T().Fatal("HandleACK blocked when AckedResources channel should remain non-blocking")
	}

	// Verify shutdown semantics:
	// After context cancellation, callers can safely range over RetryCommand() and AckedResources()
	// and no further retries are emitted.
	retryCh := umh.RetryCommand()
	ackedCh := umh.AckedResources()

	// Ensure there is at least one in-flight retry for resource-2 before we cancel, so that
	// the retry machinery is active.
	umh.HandleNACK("resource-2")

	select {
	case <-retryCh:
		// A retry for resource-2 has been emitted; the retry loop is active.
	case <-time.After(3 * baseInterval):
		suite.T().Fatal("timed out waiting for retry before canceling context")
	}

	// Now cancel the context and verify both observer channels eventually close.
	cancel()

	channelsClosed := make(chan struct{})
	go func() {
		defer close(channelsClosed)

		// Ranging over these channels must eventually complete once the handler shuts down.
		for range retryCh {
		}
		for range ackedCh {
		}
	}()

	select {
	case <-channelsClosed:
		// OK: both channels have been closed and are safe to range over.
	case <-time.After(2 * time.Second):
		suite.T().Fatal("timed out waiting for RetryCommand and AckedResources channels to close after cancel")
	}

```

1. The updated test uses `fmt.Sprintf`, so ensure `fmt` is imported at the top of `unconfirmed_message_handler_test.go` if it is not already:

   ```go
   import (
       "context"
       "fmt"
       "time"
       // other imports...
   )
   ```

2. If your project uses `require`/`assert` from `testify` in this test file, you may prefer to replace the `suite.T().Fatal` / `Fatalf` calls with `require.FailNow` / `require.Equal` for consistency.
3. This test assumes `AckedResources()` and `RetryCommand()` return the same underlying channels on each call (read-only or bidirectional). If the API differs (e.g., they return new channels or typed commands), adjust the type assertions and equality checks accordingly.
</issue_to_address>

### Comment 8
<location> `compliance/virtualmachines/relay/relay_test.go:215` </location>
<code_context>
 }

+// TestRelay_RateLimiting verifies that rate limiting works
+func (s *relayTestSuite) TestRelay_RateLimiting() {
+	// Create mock sender
+	done := concurrency.NewSignal()
</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding tests for UMH interaction in the relay, not just rate limiting

Since the relay now wires in an `UnconfirmedMessageHandler`, it would be valuable to add a test using a `mockUMH` that records `ObserveSending` calls and emits ACKs via `AckedResources()`. That test could assert that `ObserveSending` is called with the correct VSOCK ID when a report is forwarded, that ACKs trigger `markAcked` updates without disrupting normal processing, and that `RetryCommand()` messages are ignored as expected.
</issue_to_address>

### Comment 9
<location> `pkg/retry/handler/unconfirmed_message_handler.go:197` </location>
<code_context>
+	}
+}
+
+// calculateNextInterval returns the next retry interval with exponential backoff.
+func (h *UnconfirmedMessageHandlerImpl) calculateNextInterval(retry int32) time.Duration {
+	nextSeconds := (retry + 1) * int32(h.baseInterval.Seconds())
</code_context>

<issue_to_address>
**issue (complexity):** Consider simplifying the retry backoff math and unifying the locking style in `onTimerFired` to make the handler’s behavior easier to reason about.

You can simplify a couple of things without changing behavior:

### 1. Remove `fmt.Sprintf` + `time.ParseDuration` in `calculateNextInterval`

You already work with `time.Duration`, so you can avoid string round-trips and make the backoff clearer:

```go
// calculateNextInterval returns the next retry interval with exponential backoff.
func (h *UnconfirmedMessageHandlerImpl) calculateNextInterval(retry int32) time.Duration {
	if h.baseInterval <= 0 {
		return defaultBaseInterval
	}

	next := time.Duration(retry+1) * h.baseInterval
	if next <= 0 { // overflow guard
		return defaultBaseInterval
	}
	return next
}
```

This keeps the same semantics (“(retry+1) * baseInterval”, with a sane fallback) but is easier to reason about and removes the error path.

### 2. Use a single locking style in `onTimerFired`

Right now `onTimerFired` is the only place using `concurrency.WithLock`, while the rest of the type uses explicit `mu.Lock`/`Unlock`. Using one style everywhere makes invariants easier to track, especially with timers firing concurrently.

You can inline the lock and keep the rest of the logic as-is:

```go
func (h *UnconfirmedMessageHandlerImpl) onTimerFired(resourceID string) {
	if h.ctx.Err() != nil {
		return
	}

	shouldSignal := false

	h.mu.Lock()
	state, exists := h.resources[resourceID]
	if exists && state.numUnackedSendings > 0 {
		state.retry++
		nextInterval := h.calculateNextInterval(state.retry)

		log.Infof("[%s] Resource %s has %d unacked messages, suggesting retry %d (next in %s)",
			h.handlerName, resourceID, state.numUnackedSendings, state.retry, nextInterval)

		h.resetTimer(resourceID, state, nextInterval)
		shouldSignal = true
	}
	h.mu.Unlock()

	if !shouldSignal {
		return
	}

	select {
	case h.ch <- resourceID:
	default:
		log.Warnf("[%s] Retry channel full, dropping retry signal for %s", h.handlerName, resourceID)
	}
}
```

This preserves the behavior (including non-blocking send and timer reset) but removes the mixed locking patterns, which should make future changes safer and easier to reason about.
</issue_to_address>

### Comment 10
<location> `compliance/virtualmachines/relay/relay.go:26` </location>
<code_context>
 	Start(ctx context.Context) (<-chan *v1.IndexReport, error)
 }

+// cachedReport holds metadata about the last update/ACK timestamps for a VSOCK ID.
+type cachedReport struct {
+	updatedAt   time.Time
</code_context>

<issue_to_address>
**issue (complexity):** Consider removing the unused cache, stale-ACK fields, and ACK channel handling until they influence behavior to keep the relay’s logic smaller and easier to understand.

`cachedReport`, `staleAckThreshold`, and the ACK channel wiring add state and locking without affecting behavior. You can drop them for now and reintroduce when the stale-ACK logic is actually used. This keeps rate limiting + UMH retry integration while reducing cognitive load.

**1. Remove unused cache-related state and parameter**

```go
// Before
type Relay struct {
    reportStream IndexReportStream
    reportSender sender.IndexReportSender
    umh          UnconfirmedMessageHandler

    // Rate limiting config
    maxReportsPerMinute float64
    staleAckThreshold   time.Duration

    // cache stores metadata for each VSOCK ID.
    cache map[string]*cachedReport
    // limiters stores per-VSOCK rate limiters (leaky bucket, no burst).
    limiters map[string]*rate.Limiter
    // mu guards cache and limiters.
    mu sync.Mutex
}
```

```go
// After
type Relay struct {
    reportStream IndexReportStream
    reportSender sender.IndexReportSender
    umh          UnconfirmedMessageHandler

    maxReportsPerMinute float64

    limiters map[string]*rate.Limiter
    mu       sync.Mutex
}
```

```go
// Before
func New(
    reportStream IndexReportStream,
    reportSender sender.IndexReportSender,
    umh UnconfirmedMessageHandler,
    maxReportsPerMinute float64,
    staleAckThreshold time.Duration,
) *Relay {
    return &Relay{
        reportStream:        reportStream,
        reportSender:        reportSender,
        umh:                 umh,
        maxReportsPerMinute: maxReportsPerMinute,
        staleAckThreshold:   staleAckThreshold,
        cache:               make(map[string]*cachedReport),
        limiters:            make(map[string]*rate.Limiter),
    }
}
```

```go
// After
func New(
    reportStream IndexReportStream,
    reportSender sender.IndexReportSender,
    umh UnconfirmedMessageHandler,
    maxReportsPerMinute float64,
) *Relay {
    return &Relay{
        reportStream:        reportStream,
        reportSender:        reportSender,
        umh:                 umh,
        maxReportsPerMinute: maxReportsPerMinute,
        limiters:            make(map[string]*rate.Limiter),
    }
}
```

**2. Drop `cachedReport`, `cacheReport`, `markAcked`, and `AckedResources` until they’re used**

```go
// Before
type UnconfirmedMessageHandler interface {
    HandleACK(resourceID string)
    HandleNACK(resourceID string)
    ObserveSending(resourceID string)
    RetryCommand() <-chan string
    AckedResources() <-chan string
}
```

```go
// After
type UnconfirmedMessageHandler interface {
    HandleACK(resourceID string)
    HandleNACK(resourceID string)
    ObserveSending(resourceID string)
    RetryCommand() <-chan string
    // AckedResources can be added back once ACK metadata affects behavior.
}
```

```go
// Before (Run)
for {
    select {
    case <-ctx.Done():
        return ctx.Err()

    case vsockID, ok := <-r.umh.AckedResources():
        if !ok {
            log.Warn("UMH ack channel closed; stopping relay")
            return ctx.Err()
        }
        r.markAcked(vsockID)

    case vsockID := <-r.umh.RetryCommand():
        log.Debugf("Retry requested for VSOCK %s, ignoring (no cached report retained)", vsockID)

    case report := <-reportChan:
        ...
        r.handleIncomingReport(ctx, report)
    }
}
```

```go
// After (Run)
for {
    select {
    case <-ctx.Done():
        return ctx.Err()

    case vsockID := <-r.umh.RetryCommand():
        log.Debugf("Retry requested for VSOCK %s, ignoring (no cached report retained)", vsockID)

    case report := <-reportChan:
        if report == nil {
            log.Warn("Received nil report, skipping")
            continue
        }
        r.handleIncomingReport(ctx, report)
    }
}
```

```go
// Before (handleIncomingReport)
func (r *Relay) handleIncomingReport(ctx context.Context, report *v1.IndexReport) {
    vsockID := report.GetVsockCid()

    // Always cache metadata for the latest report
    r.cacheReport(report)

    // Check rate limit
    if !r.tryConsume(vsockID) { ... }

    r.umh.ObserveSending(vsockID)
    if err := r.reportSender.Send(ctx, report); err != nil { ... }
}
```

```go
// After (handleIncomingReport)
func (r *Relay) handleIncomingReport(ctx context.Context, report *v1.IndexReport) {
    vsockID := report.GetVsockCid()

    if !r.tryConsume(vsockID) {
        log.Debugf("Rate limited for VSOCK %s; dropping report and relying on agent retry", vsockID)
        return
    }

    r.umh.ObserveSending(vsockID)
    if err := r.reportSender.Send(ctx, report); err != nil {
        log.Errorf("Failed to send report (vsock CID: %s): %v", vsockID, err)
    }
}
```

By pruning the unused cache and ACK channel path, `Run` focuses on the behaviors that actually matter today (report processing, rate limiting, and UMH retry commands), and you avoid carrying around speculative state until it’s wired into real decisions.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@rhacs-bot
Copy link
Contributor

rhacs-bot commented Dec 17, 2025

Images are ready for the commit at 0eba237.

To use with deploy scripts, first export MAIN_IMAGE_TAG=4.10.x-802-g0eba237197.

@vikin91
Copy link
Contributor Author

vikin91 commented Dec 17, 2025

@sourcery-ai recheck

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Dec 17, 2025

Sure! I'm generating a new review now.

@codecov
Copy link

codecov bot commented Dec 17, 2025

Codecov Report

❌ Patch coverage is 69.59799% with 121 lines in your changes missing coverage. Please review.
✅ Project coverage is 48.88%. Comparing base (a06a205) to head (4ed566f).

Files with missing lines Patch % Lines
compliance/compliance.go 48.88% 46 Missing ⚠️
sensor/common/virtualmachine/index/handler_impl.go 7.69% 23 Missing and 1 partial ⚠️
compliance/virtualmachines/relay/relay.go 80.20% 17 Missing and 2 partials ⚠️
pkg/retry/handler/unconfirmed_message_handler.go 86.20% 12 Missing and 4 partials ⚠️
.../sensor/service/pipeline/nodeinventory/pipeline.go 62.50% 6 Missing and 3 partials ⚠️
...tral/sensor/service/pipeline/nodeindex/pipeline.go 75.00% 2 Missing and 2 partials ⚠️
...r/common/compliance/node_inventory_handler_impl.go 0.00% 3 Missing ⚠️
Additional details and impacted files
@@                        Coverage Diff                         @@
##           piotr/ROX-32316-sensor-new-ack   #18267      +/-   ##
==================================================================
- Coverage                           48.89%   48.88%   -0.02%     
==================================================================
  Files                                2627     2628       +1     
  Lines                              198377   198969     +592     
==================================================================
+ Hits                                96998    97257     +259     
- Misses                              93977    94309     +332     
- Partials                             7402     7403       +1     
Flag Coverage Δ
go-unit-tests 48.88% <69.59%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@vikin91 vikin91 force-pushed the piotr/ROX-32316-sensor-new-ack branch from c3bb1ab to 37454fc Compare December 17, 2025 15:56
@vikin91 vikin91 force-pushed the piotr/ROX-32316-compliance-retry-logic branch from 1c2b1fa to 6721dbc Compare December 17, 2025 15:56
@vikin91 vikin91 force-pushed the piotr/ROX-32316-compliance-retry-logic branch from d69f9f0 to 1893f9b Compare December 18, 2025 11:16
@vikin91 vikin91 force-pushed the piotr/ROX-32316-sensor-new-ack branch from 37454fc to 62eb393 Compare December 18, 2025 11:33
@vikin91 vikin91 force-pushed the piotr/ROX-32316-compliance-retry-logic branch from 59d02f9 to b9d3498 Compare December 18, 2025 11:33
@vikin91 vikin91 force-pushed the piotr/ROX-32316-sensor-new-ack branch from 62eb393 to a06a205 Compare January 2, 2026 14:29
@vikin91 vikin91 force-pushed the piotr/ROX-32316-compliance-retry-logic branch from cd0f70d to 4ed566f Compare January 2, 2026 14:29
@vikin91 vikin91 force-pushed the piotr/ROX-32316-sensor-new-ack branch from a06a205 to e8c1230 Compare January 6, 2026 11:01
@vikin91 vikin91 force-pushed the piotr/ROX-32316-compliance-retry-logic branch from 4ed566f to 765b879 Compare January 6, 2026 11:03
@vikin91 vikin91 force-pushed the piotr/ROX-32316-sensor-new-ack branch from e8c1230 to 967f10d Compare January 7, 2026 16:04
@vikin91 vikin91 force-pushed the piotr/ROX-32316-compliance-retry-logic branch from 765b879 to e929947 Compare January 7, 2026 16:04
@vikin91 vikin91 force-pushed the piotr/ROX-32316-sensor-new-ack branch from 967f10d to 69b8da0 Compare January 7, 2026 16:08
@vikin91 vikin91 force-pushed the piotr/ROX-32316-compliance-retry-logic branch from e929947 to 77ea3b5 Compare January 7, 2026 16:08
@vikin91 vikin91 force-pushed the piotr/ROX-32316-sensor-new-ack branch from 69b8da0 to ee9062c Compare January 8, 2026 16:57
@vikin91 vikin91 force-pushed the piotr/ROX-32316-compliance-retry-logic branch from 77ea3b5 to d866337 Compare January 8, 2026 16:57
@vikin91 vikin91 force-pushed the piotr/ROX-32316-central-rate-limiter branch from bbc57dc to 243701d Compare January 14, 2026 10:58
Introduce a reusable token bucket rate limiter in pkg/rate that provides
per-sensor fair rate limiting. Each sensor gets an equal share (1/N) of
the global rate with automatic rebalancing on connect/disconnect.

Key features:
- Float64 rate support for sub-1 req/sec limits (e.g., 0.5)
- Configurable via ROX_VM_INDEX_REPORT_RATE_LIMIT (default: 2.0 req/s)
- Per-sensor bucket capacity via ROX_VM_INDEX_REPORT_BUCKET_CAPACITY
- Prometheus metrics for accepted/rejected requests
- Global registry for multiple workload types

Integrated into virtualmachineindex pipeline with ACK/NACK responses
sent back to Sensor when rate limit is exceeded.

AI-assisted: Initial implementation and tests generated by AI.
User-reviewed: Architecture decisions, naming conventions, float64 rate
type, error handling (return errors vs panic), and test robustness.
This is to avoid state explosion when using many client IDs.
… via UMH

Why: dual-send SensorACK + legacy ACKs for node inventory/index while keeping VM index SensorACK-only; forward VM SensorACKs to Compliance/UMH; remove relay’s cached payload retries and rely on agent resend; add compliance send metrics; simplify sender to single-attempt; tighten UMH timer/lock handling; minor env/rate tweaks.

refactor(umh): replace AckedResources channel with OnACK callback

Simplifies ACK handling by using callback instead of channel.
Callback invoked outside lock to avoid deadlocks.

AI-generated, user-corrected locking and test sync patterns.
@vikin91 vikin91 changed the base branch from piotr/ROX-32316-central-rate-limiter to master January 14, 2026 11:00
@vikin91 vikin91 force-pushed the piotr/ROX-32316-compliance-retry-logic branch from 3736e91 to 0eba237 Compare January 14, 2026 11:00
@vikin91
Copy link
Contributor Author

vikin91 commented Jan 14, 2026

Closing in favor of #18488 and #18489

@vikin91 vikin91 closed this Jan 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants