From e56dae03ffaa49dcb113789369cdb5b651f8d402 Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Thu, 12 Mar 2026 12:34:13 +0100 Subject: [PATCH] fix(compliance): retry VM relay vsock bind until KubeVirt is available When StackRox starts before KubeVirt, the VM relay's vsock bind fails once and never retries, leaving the relay disabled for the pod lifetime. Add exponential backoff retry around stream creation so the relay recovers when vsock becomes available after KubeVirt is installed. - Extract createVMRelayStreamWithRetry() with context-aware retry loop - Add stream.NewWithListener() for tests - Add unit tests for retry success, cancellation, and immediate success - Document bind-failure flow in compliance, stream, and vsock packages AI-generated: retry logic, tests, NewWithListener, and documentation. User-reviewed: design and implementatio. --- compliance/compliance.go | 31 +++++--- .../relay/stream/vsock_index_report_stream.go | 18 ++++- .../virtualmachines/relay/vsock/vsock.go | 3 + compliance/vm_relay_retry.go | 39 ++++++++++ compliance/vm_relay_retry_test.go | 75 +++++++++++++++++++ 5 files changed, 154 insertions(+), 12 deletions(-) create mode 100644 compliance/vm_relay_retry.go create mode 100644 compliance/vm_relay_retry_test.go diff --git a/compliance/compliance.go b/compliance/compliance.go index 08f0c42bfeebf..8ead20cbfc2aa 100644 --- a/compliance/compliance.go +++ b/compliance/compliance.go @@ -125,24 +125,33 @@ func (c *Compliance) Start() { // The virtual machine relay (ROX-30476), which reads VM index reports from vsock connections and forwards them to // sensor, is currently started and run in the compliance container. This enables reusing the existing connection to // sensor and accelerates initial development. + // + // Startup flow: stream.New() binds to vsock (via vsock.NewListener). If vsock is unavailable (e.g. KubeVirt not yet + // installed), the bind fails. Retry with backoff below recovers when vsock becomes available after KubeVirt is + // installed; without retry the relay would be disabled for the pod lifetime until restart. go func(ctx context.Context) { defer wg.Add(-1) - if features.VirtualMachines.Enabled() { - log.Infof("Virtual machine relay enabled") + if !features.VirtualMachines.Enabled() { + return + } + log.Infof("Virtual machine relay enabled") - reportStream, err := stream.New() - if err != nil { - log.Errorf("Error creating report stream: %v", err) + reportStream, err := createVMRelayStreamWithRetry(ctx, stream.New) + if err != nil { + if ctx.Err() != nil { + log.Info("Virtual machine relay startup cancelled during vsock retry") return } + log.Errorf("Error creating report stream after retries: %v", err) + return + } - sensorClient := sensor.NewVirtualMachineIndexReportServiceClient(conn) - reportSender := sender.New(sensorClient) + sensorClient := sensor.NewVirtualMachineIndexReportServiceClient(conn) + reportSender := sender.New(sensorClient) - vmRelay := relay.New(reportStream, reportSender) - if err := vmRelay.Run(ctx); err != nil { - log.Errorf("Error running virtual machine relay: %v", err) - } + vmRelay := relay.New(reportStream, reportSender) + if err := vmRelay.Run(ctx); err != nil { + log.Errorf("Error running virtual machine relay: %v", err) } }(ctx) diff --git a/compliance/virtualmachines/relay/stream/vsock_index_report_stream.go b/compliance/virtualmachines/relay/stream/vsock_index_report_stream.go index 22011519e97ac..b640d85d60a04 100644 --- a/compliance/virtualmachines/relay/stream/vsock_index_report_stream.go +++ b/compliance/virtualmachines/relay/stream/vsock_index_report_stream.go @@ -38,12 +38,28 @@ type VsockIndexReportStream struct { // New creates a VsockIndexReportStream with a vsock listener. // Concurrency limits are read from env vars VirtualMachinesMaxConcurrentVsockConnections // and VirtualMachinesConcurrencyTimeout. +// +// Bind-failure return path: vsock.NewListener() performs a one-shot bind. If vsock support is unavailable +// (e.g. KubeVirt not yet installed), the error propagates to the caller. There is no retry here; the caller +// (compliance) is responsible for retrying if desired. func New() (*VsockIndexReportStream, error) { listener, err := vsock.NewListener() if err != nil { return nil, errors.Wrap(err, "creating vsock listener") } + return newWithListener(listener), nil +} + +// NewWithListener creates a VsockIndexReportStream with the given listener. +// For testing only; use New() in production. +func NewWithListener(listener net.Listener) (*VsockIndexReportStream, error) { + if listener == nil { + return nil, errors.New("listener is nil") + } + return newWithListener(listener), nil +} +func newWithListener(listener net.Listener) *VsockIndexReportStream { maxConcurrentConnections := env.VirtualMachinesMaxConcurrentVsockConnections.IntegerSetting() semaphoreTimeout := env.VirtualMachinesConcurrencyTimeout.DurationSetting() maxSizeBytes := env.VirtualMachinesVsockConnMaxSizeKB.IntegerSetting() * 1024 @@ -56,7 +72,7 @@ func New() (*VsockIndexReportStream, error) { connectionReadTimeout: 10 * time.Second, waitAfterFailedAccept: time.Second, maxSizeBytes: maxSizeBytes, - }, nil + } } // Start begins accepting vsock connections and returns a channel of validated reports. diff --git a/compliance/virtualmachines/relay/vsock/vsock.go b/compliance/virtualmachines/relay/vsock/vsock.go index 17d36359f42e3..0cd6f76e25539 100644 --- a/compliance/virtualmachines/relay/vsock/vsock.go +++ b/compliance/virtualmachines/relay/vsock/vsock.go @@ -30,6 +30,9 @@ func ExtractVsockCIDFromConnection(conn net.Conn) (uint32, error) { // NewListener creates a vsock listener on the host context ID (vsock.Host) using the port // from VirtualMachinesVsockPort env var. Caller must close the returned listener. +// +// One-shot bind: ListenContextID fails immediately if vsock is unsupported (e.g. kernel module not loaded, +// KubeVirt not installed). The caller receives the error; there is no retry at this layer. func NewListener() (net.Listener, error) { port := env.VirtualMachinesVsockPort.IntegerSetting() listener, err := vsock.ListenContextID(vsock.Host, uint32(port), nil) diff --git a/compliance/vm_relay_retry.go b/compliance/vm_relay_retry.go new file mode 100644 index 0000000000000..02e3cc876a2e8 --- /dev/null +++ b/compliance/vm_relay_retry.go @@ -0,0 +1,39 @@ +package compliance + +import ( + "context" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/stackrox/rox/compliance/virtualmachines/relay/stream" + "github.com/stackrox/rox/pkg/logging" +) + +var vmRelayLog = logging.LoggerForModule() + +// createVMRelayStreamWithRetry creates a vsock stream with retry and backoff. +// Retries until the factory succeeds or ctx is cancelled. Used to recover when vsock +// becomes available after KubeVirt is installed. +func createVMRelayStreamWithRetry(ctx context.Context, createStream func() (*stream.VsockIndexReportStream, error)) (*stream.VsockIndexReportStream, error) { + eb := backoff.NewExponentialBackOff() + eb.MaxInterval = 30 * time.Second + eb.MaxElapsedTime = 0 // Retry indefinitely until vsock becomes available or ctx cancelled + b := backoff.WithContext(eb, ctx) + + var reportStream *stream.VsockIndexReportStream + operation := func() error { + var err error + reportStream, err = createStream() + return err + } + notify := func(err error, d time.Duration) { + vmRelayLog.Infof("Vsock bind failed, retrying in %0.2f seconds: %v", d.Seconds(), err) + } + if err := backoff.RetryNotify(operation, b, notify); err != nil { + if ctx.Err() != nil { + return nil, ctx.Err() + } + return nil, err + } + return reportStream, nil +} diff --git a/compliance/vm_relay_retry_test.go b/compliance/vm_relay_retry_test.go new file mode 100644 index 0000000000000..5c585f1dc684a --- /dev/null +++ b/compliance/vm_relay_retry_test.go @@ -0,0 +1,75 @@ +package compliance + +import ( + "context" + "errors" + "net" + "sync/atomic" + "testing" + "time" + + "github.com/stackrox/rox/compliance/virtualmachines/relay/stream" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCreateVMRelayStreamWithRetry_SucceedsAfterFailures(t *testing.T) { + failCount := 2 + attempts := atomic.Int32{} + + createStream := func() (*stream.VsockIndexReportStream, error) { + n := attempts.Add(1) + // Simulate failure cases: return an error until the success case is reached. + if n <= int32(failCount) { + return nil, errors.New("vsock not available") + } + // Success case: use a TCP listener as a stand-in for vsock (test-only). + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + t.Cleanup(func() { _ = listener.Close() }) + s, err := stream.NewWithListener(listener) + require.NoError(t, err) + return s, nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + reportStream, err := createVMRelayStreamWithRetry(ctx, createStream) + require.NoError(t, err) + require.NotNil(t, reportStream) + assert.GreaterOrEqual(t, attempts.Load(), int32(failCount+1), "should have retried at least %d times before success", failCount+1) +} + +func TestCreateVMRelayStreamWithRetry_CancellationStopsRetryPromptly(t *testing.T) { + attempts := atomic.Int32{} + createStream := func() (*stream.VsockIndexReportStream, error) { + attempts.Add(1) + return nil, errors.New("vsock not available") + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + _, err := createVMRelayStreamWithRetry(ctx, createStream) + + require.Error(t, err) + assert.ErrorIs(t, err, context.Canceled) + assert.LessOrEqual(t, attempts.Load(), int32(1), "should not retry after cancellation") +} + +func TestCreateVMRelayStreamWithRetry_SucceedsImmediately(t *testing.T) { + createStream := func() (*stream.VsockIndexReportStream, error) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + t.Cleanup(func() { _ = listener.Close() }) + return stream.NewWithListener(listener) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + reportStream, err := createVMRelayStreamWithRetry(ctx, createStream) + require.NoError(t, err) + require.NotNil(t, reportStream) +}