Skip to content
3 changes: 3 additions & 0 deletions pkg/centralsensor/caps_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ const (

// AuditLogEventsCap identifies the capability to handle audit log event detection.
AuditLogEventsCap SensorCapability = "AuditLogEvents"

// LocalScannerCredentialsRefresh identifies the capability to maintain the Local scanner TLS credentials refreshed.
LocalScannerCredentialsRefresh SensorCapability = "LocalScannerCredentialsRefresh"
)
107 changes: 107 additions & 0 deletions sensor/kubernetes/localscanner/cert_refresher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package localscanner

import (
"context"
"time"

"github.com/pkg/errors"
"github.com/stackrox/rox/generated/internalapi/central"
"github.com/stackrox/rox/generated/storage"
"github.com/stackrox/rox/pkg/concurrency"
"github.com/stackrox/rox/pkg/logging"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
)

var (
log = logging.LoggerForModule()
certsDescription = "local scanner credentials"
_ certRefresher = (*certRefresherImpl)(nil)
)

type certRefresher interface {
Start()
Stop()
}

func newCertRefresher(requestCertificates requestCertificatesFunc, timeout time.Duration,
backoff wait.Backoff) certRefresher {
return &certRefresherImpl{
requestCertificates: requestCertificates,
certRefreshTimeout: timeout,
certRefreshBackoff: backoff,
}
}

type certRefresherImpl struct {
requestCertificates requestCertificatesFunc
certRefreshTimeout time.Duration
certRefreshBackoff wait.Backoff
certRefreshTicker *concurrency.RetryTicker
certSecretsRepoImpl // FIXME to composition
}

type requestCertificatesFunc func(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error)

func (i *certRefresherImpl) Start() {
i.certRefreshTicker = concurrency.NewRetryTicker(i.RefreshCertificates, i.certRefreshTimeout, i.certRefreshBackoff)
i.certRefreshTicker.OnTickSuccess = i.logRefreshSuccess
i.certRefreshTicker.OnTickError = i.logRefreshError
i.certRefreshTicker.Start()
}

func (i *certRefresherImpl) Stop() {
if i.certRefreshTicker != nil {
i.certRefreshTicker.Stop()
}
}

// RefreshCertificates determines refreshes the certificate secrets if needed, and returns the time
// until the next refresh.
// This is running in the goroutine for a refresh timer in i.certRefresher.
func (i *certRefresherImpl) RefreshCertificates(ctx context.Context) (timeToRefresh time.Duration, err error) {
secrets, fetchErr := i.getSecrets(ctx)
if fetchErr != nil {
return 0, fetchErr
}
timeToRefresh = time.Until(i.getCertRenewalTime(secrets))
if timeToRefresh > 0 {
return timeToRefresh, nil
}

response, requestErr := i.requestCertificates(ctx)
if requestErr != nil {
return 0, requestErr
}
if response.GetError() != nil {
return 0, errors.Errorf("central refused to issue certificates: %s", response.GetError().GetMessage())
}

certificates := response.GetCertificates()
if refreshErr := i.updateSecrets(ctx, certificates, secrets); refreshErr != nil {
return 0, refreshErr
}
timeToRefresh = time.Until(i.getCertRenewalTime(secrets))
return timeToRefresh, nil
}

func (i *certRefresherImpl) logRefreshSuccess(nextTimeToTick time.Duration) {
log.Infof("successfully refreshed %v", certsDescription)
log.Infof("%v scheduled to be refreshed in %s", certsDescription, nextTimeToTick)
}

func (i *certRefresherImpl) logRefreshError(refreshErr error) {
log.Errorf("refreshing %s: %s", certsDescription, refreshErr)
}

func (i *certRefresherImpl) getCertRenewalTime(secrets map[storage.ServiceType]*v1.Secret) time.Time {
return time.Now() // TODO
}

// updateSecrets stores the certificates in the data of each corresponding secret, and then persists
// the secrets in k8s.
func (i *certRefresherImpl) updateSecrets(ctx context.Context, certificates *storage.TypedServiceCertificateSet,
secrets map[storage.ServiceType]*v1.Secret) error {
// TODO
return i.putSecrets(ctx, secrets)
}
66 changes: 66 additions & 0 deletions sensor/kubernetes/localscanner/certificate_requester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package localscanner

import (
"context"

"github.com/stackrox/rox/generated/internalapi/central"
"github.com/stackrox/rox/pkg/uuid"
)

var (
_ CertificateRequester = (*certRequesterSyncImpl)(nil)
)

// CertificateRequester request a new set of local scanner certificates to central.
type CertificateRequester interface {
RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error)
}

// NewCertificateRequester creates a new CertificateRequester that communicates through
// the specified channels, and that uses a fresh request ID.
func NewCertificateRequester(msgFromSensorC chan *central.MsgFromSensor,
msgToSensorC chan *central.IssueLocalScannerCertsResponse) CertificateRequester {
return &certRequesterSyncImpl{
requestID: uuid.NewV4().String(),
msgFromSensorC: msgFromSensorC,
msgToSensorC: msgToSensorC,
}
}

type certRequesterSyncImpl struct {
requestID string
msgFromSensorC chan *central.MsgFromSensor
msgToSensorC chan *central.IssueLocalScannerCertsResponse
}

func (i *certRequesterSyncImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) {
msg := &central.MsgFromSensor{
Msg: &central.MsgFromSensor_IssueLocalScannerCertsRequest{
IssueLocalScannerCertsRequest: &central.IssueLocalScannerCertsRequest{
RequestId: i.requestID,
},
},
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case i.msgFromSensorC <- msg:
log.Debugf("request to issue local Scanner certificates sent to Central succesfully: %v", msg)
}

var response *central.IssueLocalScannerCertsResponse
for response == nil {
select {
case <-ctx.Done():
return nil, ctx.Err()
case newResponse := <-i.msgToSensorC:
if newResponse.GetRequestId() != i.requestID {
log.Debugf("ignoring response with unknown request id %s", response.GetRequestId())
} else {
response = newResponse
}
}
}

return response, nil
}
30 changes: 30 additions & 0 deletions sensor/kubernetes/localscanner/secrets_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package localscanner
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be relatively simple to create fake k8s clients if you want to use it for testing.


import (
"context"

"github.com/stackrox/rox/generated/storage"
v1 "k8s.io/api/core/v1"
)

var (
_ certSecretsRepo = (*certSecretsRepoImpl)(nil)
)

type certSecretsRepo interface {
getSecrets(ctx context.Context) (map[storage.ServiceType]*v1.Secret, error)
putSecrets(ctx context.Context, secrets map[storage.ServiceType]*v1.Secret) error
}

type certSecretsRepoImpl struct {
// TODO secretsClient corev1.SecretInterface
}

func (i *certSecretsRepoImpl) getSecrets(ctx context.Context) (map[storage.ServiceType]*v1.Secret, error) {
secretsMap := make(map[storage.ServiceType]*v1.Secret, 3)
return secretsMap, nil // TODO
}

func (i *certSecretsRepoImpl) putSecrets(ctx context.Context, secrets map[storage.ServiceType]*v1.Secret) error {
return nil // TODO
}
81 changes: 81 additions & 0 deletions sensor/kubernetes/localscanner/tls_issuer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package localscanner

import (
"context"
"time"

"github.com/stackrox/rox/generated/internalapi/central"
"github.com/stackrox/rox/pkg/centralsensor"
"github.com/stackrox/rox/sensor/common"
"k8s.io/apimachinery/pkg/util/wait"
)

var (
_ common.SensorComponent = (*localScannerTLSIssuerImpl)(nil)
)

// NewLocalScannerTLSIssuer creates a sensor component that will keep the local scanner certificates
// fresh, using the specified retry parameters.
func NewLocalScannerTLSIssuer(certRefreshTimeout time.Duration, certRefreshBackoff wait.Backoff) common.SensorComponent {
msgFromSensorC := make(msgFromSensorChan)
msgToSensorC := make(msgToSensorChan)
requestCertificates := func(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) {
certRequester := NewCertificateRequester(msgFromSensorC, msgToSensorC)
return certRequester.RequestCertificates(ctx)
}
return &localScannerTLSIssuerImpl{
msgFromSensorC: msgFromSensorC,
msgToSensorC: msgToSensorC,
certRefresher: newCertRefresher(requestCertificates, certRefreshTimeout, certRefreshBackoff),
}
}

type msgFromSensorChan chan *central.MsgFromSensor
type msgToSensorChan chan *central.IssueLocalScannerCertsResponse
type localScannerTLSIssuerImpl struct {
msgFromSensorC msgFromSensorChan
msgToSensorC msgToSensorChan
certRefresher certRefresher
}

func (i *localScannerTLSIssuerImpl) Start() error {
log.Info("starting local scanner TLS issuer.")
i.certRefresher.Start()
log.Info("local scanner TLS issuer started.")

return nil
}

func (i *localScannerTLSIssuerImpl) Stop(err error) {
i.certRefresher.Stop()
log.Info("local scanner TLS issuer stopped.")
}

func (i *localScannerTLSIssuerImpl) Capabilities() []centralsensor.SensorCapability {
return []centralsensor.SensorCapability{centralsensor.LocalScannerCredentialsRefresh}
}

// ResponsesC is called "responses" because for other SensorComponent it is central that
// initiates the interaction. However, here it is sensor which sends a request to central.
func (i *localScannerTLSIssuerImpl) ResponsesC() <-chan *central.MsgFromSensor {
return i.msgFromSensorC
}

// ProcessMessage is how the central receiver delivers messages from central to SensorComponents.
// This method must not block as it would prevent centralReceiverImpl from sending messages
// to other SensorComponents.
func (i *localScannerTLSIssuerImpl) ProcessMessage(msg *central.MsgToSensor) error {
switch m := msg.GetMsg().(type) {
case *central.MsgToSensor_IssueLocalScannerCertsResponse:
response := m.IssueLocalScannerCertsResponse
go func() {
// will block if i.resultC is filled.
i.msgToSensorC <- response
}()
return nil
default:
// silently ignore other messages broadcasted by centralReceiverImpl, as centralReceiverImpl logs
// all returned errors with error level.
return nil
}
}