diff --git a/compliance/compliance.go b/compliance/compliance.go index 08f0c42bfeebf..8ead20cbfc2aa 100644 --- a/compliance/compliance.go +++ b/compliance/compliance.go @@ -125,24 +125,33 @@ func (c *Compliance) Start() { // The virtual machine relay (ROX-30476), which reads VM index reports from vsock connections and forwards them to // sensor, is currently started and run in the compliance container. This enables reusing the existing connection to // sensor and accelerates initial development. + // + // Startup flow: stream.New() binds to vsock (via vsock.NewListener). If vsock is unavailable (e.g. KubeVirt not yet + // installed), the bind fails. Retry with backoff below recovers when vsock becomes available after KubeVirt is + // installed; without retry the relay would be disabled for the pod lifetime until restart. go func(ctx context.Context) { defer wg.Add(-1) - if features.VirtualMachines.Enabled() { - log.Infof("Virtual machine relay enabled") + if !features.VirtualMachines.Enabled() { + return + } + log.Infof("Virtual machine relay enabled") - reportStream, err := stream.New() - if err != nil { - log.Errorf("Error creating report stream: %v", err) + reportStream, err := createVMRelayStreamWithRetry(ctx, stream.New) + if err != nil { + if ctx.Err() != nil { + log.Info("Virtual machine relay startup cancelled during vsock retry") return } + log.Errorf("Error creating report stream after retries: %v", err) + return + } - sensorClient := sensor.NewVirtualMachineIndexReportServiceClient(conn) - reportSender := sender.New(sensorClient) + sensorClient := sensor.NewVirtualMachineIndexReportServiceClient(conn) + reportSender := sender.New(sensorClient) - vmRelay := relay.New(reportStream, reportSender) - if err := vmRelay.Run(ctx); err != nil { - log.Errorf("Error running virtual machine relay: %v", err) - } + vmRelay := relay.New(reportStream, reportSender) + if err := vmRelay.Run(ctx); err != nil { + log.Errorf("Error running virtual machine relay: %v", err) } }(ctx) diff --git a/compliance/virtualmachines/relay/stream/vsock_index_report_stream.go b/compliance/virtualmachines/relay/stream/vsock_index_report_stream.go index 22011519e97ac..b640d85d60a04 100644 --- a/compliance/virtualmachines/relay/stream/vsock_index_report_stream.go +++ b/compliance/virtualmachines/relay/stream/vsock_index_report_stream.go @@ -38,12 +38,28 @@ type VsockIndexReportStream struct { // New creates a VsockIndexReportStream with a vsock listener. // Concurrency limits are read from env vars VirtualMachinesMaxConcurrentVsockConnections // and VirtualMachinesConcurrencyTimeout. +// +// Bind-failure return path: vsock.NewListener() performs a one-shot bind. If vsock support is unavailable +// (e.g. KubeVirt not yet installed), the error propagates to the caller. There is no retry here; the caller +// (compliance) is responsible for retrying if desired. func New() (*VsockIndexReportStream, error) { listener, err := vsock.NewListener() if err != nil { return nil, errors.Wrap(err, "creating vsock listener") } + return newWithListener(listener), nil +} + +// NewWithListener creates a VsockIndexReportStream with the given listener. +// For testing only; use New() in production. +func NewWithListener(listener net.Listener) (*VsockIndexReportStream, error) { + if listener == nil { + return nil, errors.New("listener is nil") + } + return newWithListener(listener), nil +} +func newWithListener(listener net.Listener) *VsockIndexReportStream { maxConcurrentConnections := env.VirtualMachinesMaxConcurrentVsockConnections.IntegerSetting() semaphoreTimeout := env.VirtualMachinesConcurrencyTimeout.DurationSetting() maxSizeBytes := env.VirtualMachinesVsockConnMaxSizeKB.IntegerSetting() * 1024 @@ -56,7 +72,7 @@ func New() (*VsockIndexReportStream, error) { connectionReadTimeout: 10 * time.Second, waitAfterFailedAccept: time.Second, maxSizeBytes: maxSizeBytes, - }, nil + } } // Start begins accepting vsock connections and returns a channel of validated reports. diff --git a/compliance/virtualmachines/relay/vsock/vsock.go b/compliance/virtualmachines/relay/vsock/vsock.go index 17d36359f42e3..0cd6f76e25539 100644 --- a/compliance/virtualmachines/relay/vsock/vsock.go +++ b/compliance/virtualmachines/relay/vsock/vsock.go @@ -30,6 +30,9 @@ func ExtractVsockCIDFromConnection(conn net.Conn) (uint32, error) { // NewListener creates a vsock listener on the host context ID (vsock.Host) using the port // from VirtualMachinesVsockPort env var. Caller must close the returned listener. +// +// One-shot bind: ListenContextID fails immediately if vsock is unsupported (e.g. kernel module not loaded, +// KubeVirt not installed). The caller receives the error; there is no retry at this layer. func NewListener() (net.Listener, error) { port := env.VirtualMachinesVsockPort.IntegerSetting() listener, err := vsock.ListenContextID(vsock.Host, uint32(port), nil) diff --git a/compliance/vm_relay_retry.go b/compliance/vm_relay_retry.go new file mode 100644 index 0000000000000..02e3cc876a2e8 --- /dev/null +++ b/compliance/vm_relay_retry.go @@ -0,0 +1,39 @@ +package compliance + +import ( + "context" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/stackrox/rox/compliance/virtualmachines/relay/stream" + "github.com/stackrox/rox/pkg/logging" +) + +var vmRelayLog = logging.LoggerForModule() + +// createVMRelayStreamWithRetry creates a vsock stream with retry and backoff. +// Retries until the factory succeeds or ctx is cancelled. Used to recover when vsock +// becomes available after KubeVirt is installed. +func createVMRelayStreamWithRetry(ctx context.Context, createStream func() (*stream.VsockIndexReportStream, error)) (*stream.VsockIndexReportStream, error) { + eb := backoff.NewExponentialBackOff() + eb.MaxInterval = 30 * time.Second + eb.MaxElapsedTime = 0 // Retry indefinitely until vsock becomes available or ctx cancelled + b := backoff.WithContext(eb, ctx) + + var reportStream *stream.VsockIndexReportStream + operation := func() error { + var err error + reportStream, err = createStream() + return err + } + notify := func(err error, d time.Duration) { + vmRelayLog.Infof("Vsock bind failed, retrying in %0.2f seconds: %v", d.Seconds(), err) + } + if err := backoff.RetryNotify(operation, b, notify); err != nil { + if ctx.Err() != nil { + return nil, ctx.Err() + } + return nil, err + } + return reportStream, nil +} diff --git a/compliance/vm_relay_retry_test.go b/compliance/vm_relay_retry_test.go new file mode 100644 index 0000000000000..5c585f1dc684a --- /dev/null +++ b/compliance/vm_relay_retry_test.go @@ -0,0 +1,75 @@ +package compliance + +import ( + "context" + "errors" + "net" + "sync/atomic" + "testing" + "time" + + "github.com/stackrox/rox/compliance/virtualmachines/relay/stream" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCreateVMRelayStreamWithRetry_SucceedsAfterFailures(t *testing.T) { + failCount := 2 + attempts := atomic.Int32{} + + createStream := func() (*stream.VsockIndexReportStream, error) { + n := attempts.Add(1) + // Simulate failure cases: return an error until the success case is reached. + if n <= int32(failCount) { + return nil, errors.New("vsock not available") + } + // Success case: use a TCP listener as a stand-in for vsock (test-only). + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + t.Cleanup(func() { _ = listener.Close() }) + s, err := stream.NewWithListener(listener) + require.NoError(t, err) + return s, nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + reportStream, err := createVMRelayStreamWithRetry(ctx, createStream) + require.NoError(t, err) + require.NotNil(t, reportStream) + assert.GreaterOrEqual(t, attempts.Load(), int32(failCount+1), "should have retried at least %d times before success", failCount+1) +} + +func TestCreateVMRelayStreamWithRetry_CancellationStopsRetryPromptly(t *testing.T) { + attempts := atomic.Int32{} + createStream := func() (*stream.VsockIndexReportStream, error) { + attempts.Add(1) + return nil, errors.New("vsock not available") + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + _, err := createVMRelayStreamWithRetry(ctx, createStream) + + require.Error(t, err) + assert.ErrorIs(t, err, context.Canceled) + assert.LessOrEqual(t, attempts.Load(), int32(1), "should not retry after cancellation") +} + +func TestCreateVMRelayStreamWithRetry_SucceedsImmediately(t *testing.T) { + createStream := func() (*stream.VsockIndexReportStream, error) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + t.Cleanup(func() { _ = listener.Close() }) + return stream.NewWithListener(listener) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + reportStream, err := createVMRelayStreamWithRetry(ctx, createStream) + require.NoError(t, err) + require.NotNil(t, reportStream) +}