Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions compliance/compliance.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,24 +125,33 @@ func (c *Compliance) Start() {
// The virtual machine relay (ROX-30476), which reads VM index reports from vsock connections and forwards them to
// sensor, is currently started and run in the compliance container. This enables reusing the existing connection to
// sensor and accelerates initial development.
//
// Startup flow: stream.New() binds to vsock (via vsock.NewListener). If vsock is unavailable (e.g. KubeVirt not yet
// installed), the bind fails. Retry with backoff below recovers when vsock becomes available after KubeVirt is
// installed; without retry the relay would be disabled for the pod lifetime until restart.
go func(ctx context.Context) {
defer wg.Add(-1)
if features.VirtualMachines.Enabled() {
log.Infof("Virtual machine relay enabled")
if !features.VirtualMachines.Enabled() {
return
}
log.Infof("Virtual machine relay enabled")

reportStream, err := stream.New()
if err != nil {
log.Errorf("Error creating report stream: %v", err)
reportStream, err := createVMRelayStreamWithRetry(ctx, stream.New)
if err != nil {
if ctx.Err() != nil {
log.Info("Virtual machine relay startup cancelled during vsock retry")
return
}
log.Errorf("Error creating report stream after retries: %v", err)
return
}

sensorClient := sensor.NewVirtualMachineIndexReportServiceClient(conn)
reportSender := sender.New(sensorClient)
sensorClient := sensor.NewVirtualMachineIndexReportServiceClient(conn)
reportSender := sender.New(sensorClient)

vmRelay := relay.New(reportStream, reportSender)
if err := vmRelay.Run(ctx); err != nil {
log.Errorf("Error running virtual machine relay: %v", err)
}
vmRelay := relay.New(reportStream, reportSender)
if err := vmRelay.Run(ctx); err != nil {
log.Errorf("Error running virtual machine relay: %v", err)
}
}(ctx)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,28 @@ type VsockIndexReportStream struct {
// New creates a VsockIndexReportStream with a vsock listener.
// Concurrency limits are read from env vars VirtualMachinesMaxConcurrentVsockConnections
// and VirtualMachinesConcurrencyTimeout.
//
// Bind-failure return path: vsock.NewListener() performs a one-shot bind. If vsock support is unavailable
// (e.g. KubeVirt not yet installed), the error propagates to the caller. There is no retry here; the caller
// (compliance) is responsible for retrying if desired.
func New() (*VsockIndexReportStream, error) {
listener, err := vsock.NewListener()
if err != nil {
return nil, errors.Wrap(err, "creating vsock listener")
}
return newWithListener(listener), nil
}

// NewWithListener creates a VsockIndexReportStream with the given listener.
// For testing only; use New() in production.
func NewWithListener(listener net.Listener) (*VsockIndexReportStream, error) {
if listener == nil {
return nil, errors.New("listener is nil")
}
return newWithListener(listener), nil
}

func newWithListener(listener net.Listener) *VsockIndexReportStream {
maxConcurrentConnections := env.VirtualMachinesMaxConcurrentVsockConnections.IntegerSetting()
semaphoreTimeout := env.VirtualMachinesConcurrencyTimeout.DurationSetting()
maxSizeBytes := env.VirtualMachinesVsockConnMaxSizeKB.IntegerSetting() * 1024
Expand All @@ -56,7 +72,7 @@ func New() (*VsockIndexReportStream, error) {
connectionReadTimeout: 10 * time.Second,
waitAfterFailedAccept: time.Second,
maxSizeBytes: maxSizeBytes,
}, nil
}
}

// Start begins accepting vsock connections and returns a channel of validated reports.
Expand Down
3 changes: 3 additions & 0 deletions compliance/virtualmachines/relay/vsock/vsock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ func ExtractVsockCIDFromConnection(conn net.Conn) (uint32, error) {

// NewListener creates a vsock listener on the host context ID (vsock.Host) using the port
// from VirtualMachinesVsockPort env var. Caller must close the returned listener.
//
// One-shot bind: ListenContextID fails immediately if vsock is unsupported (e.g. kernel module not loaded,
// KubeVirt not installed). The caller receives the error; there is no retry at this layer.
func NewListener() (net.Listener, error) {
port := env.VirtualMachinesVsockPort.IntegerSetting()
listener, err := vsock.ListenContextID(vsock.Host, uint32(port), nil)
Expand Down
39 changes: 39 additions & 0 deletions compliance/vm_relay_retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package compliance

import (
"context"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/stackrox/rox/compliance/virtualmachines/relay/stream"
"github.com/stackrox/rox/pkg/logging"
)

var vmRelayLog = logging.LoggerForModule()

// createVMRelayStreamWithRetry creates a vsock stream with retry and backoff.
// Retries until the factory succeeds or ctx is cancelled. Used to recover when vsock
// becomes available after KubeVirt is installed.
func createVMRelayStreamWithRetry(ctx context.Context, createStream func() (*stream.VsockIndexReportStream, error)) (*stream.VsockIndexReportStream, error) {
eb := backoff.NewExponentialBackOff()
eb.MaxInterval = 30 * time.Second
eb.MaxElapsedTime = 0 // Retry indefinitely until vsock becomes available or ctx cancelled
b := backoff.WithContext(eb, ctx)

var reportStream *stream.VsockIndexReportStream
operation := func() error {
var err error
reportStream, err = createStream()
return err
}
notify := func(err error, d time.Duration) {
vmRelayLog.Infof("Vsock bind failed, retrying in %0.2f seconds: %v", d.Seconds(), err)
}
if err := backoff.RetryNotify(operation, b, notify); err != nil {
if ctx.Err() != nil {
return nil, ctx.Err()
}
return nil, err
}
return reportStream, nil
}
75 changes: 75 additions & 0 deletions compliance/vm_relay_retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package compliance

import (
"context"
"errors"
"net"
"sync/atomic"
"testing"
"time"

"github.com/stackrox/rox/compliance/virtualmachines/relay/stream"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCreateVMRelayStreamWithRetry_SucceedsAfterFailures(t *testing.T) {
failCount := 2
attempts := atomic.Int32{}

createStream := func() (*stream.VsockIndexReportStream, error) {
n := attempts.Add(1)
// Simulate failure cases: return an error until the success case is reached.
if n <= int32(failCount) {
return nil, errors.New("vsock not available")
}
// Success case: use a TCP listener as a stand-in for vsock (test-only).
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(func() { _ = listener.Close() })
s, err := stream.NewWithListener(listener)
require.NoError(t, err)
return s, nil
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

reportStream, err := createVMRelayStreamWithRetry(ctx, createStream)
require.NoError(t, err)
require.NotNil(t, reportStream)
assert.GreaterOrEqual(t, attempts.Load(), int32(failCount+1), "should have retried at least %d times before success", failCount+1)
}

func TestCreateVMRelayStreamWithRetry_CancellationStopsRetryPromptly(t *testing.T) {
attempts := atomic.Int32{}
createStream := func() (*stream.VsockIndexReportStream, error) {
attempts.Add(1)
return nil, errors.New("vsock not available")
}

ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately

_, err := createVMRelayStreamWithRetry(ctx, createStream)

require.Error(t, err)
assert.ErrorIs(t, err, context.Canceled)
assert.LessOrEqual(t, attempts.Load(), int32(1), "should not retry after cancellation")
}

func TestCreateVMRelayStreamWithRetry_SucceedsImmediately(t *testing.T) {
createStream := func() (*stream.VsockIndexReportStream, error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(func() { _ = listener.Close() })
return stream.NewWithListener(listener)
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

reportStream, err := createVMRelayStreamWithRetry(ctx, createStream)
require.NoError(t, err)
require.NotNil(t, reportStream)
}
Loading