diff --git a/pkg/centralsensor/caps_list.go b/pkg/centralsensor/caps_list.go index e31cd51e8a556..e0b85406eee2b 100644 --- a/pkg/centralsensor/caps_list.go +++ b/pkg/centralsensor/caps_list.go @@ -24,4 +24,7 @@ const ( // AuditLogEventsCap identifies the capability to handle audit log event detection. AuditLogEventsCap SensorCapability = "AuditLogEvents" + + // LocalScannerCredentialsRefresh identifies the capability to maintain the Local scanner TLS credentials refreshed. + LocalScannerCredentialsRefresh SensorCapability = "LocalScannerCredentialsRefresh" ) diff --git a/sensor/kubernetes/localscanner/cert_refresher.go b/sensor/kubernetes/localscanner/cert_refresher.go new file mode 100644 index 0000000000000..4c69e25a47f6a --- /dev/null +++ b/sensor/kubernetes/localscanner/cert_refresher.go @@ -0,0 +1,107 @@ +package localscanner + +import ( + "context" + "time" + + "github.com/pkg/errors" + "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/generated/storage" + "github.com/stackrox/rox/pkg/concurrency" + "github.com/stackrox/rox/pkg/logging" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" +) + +var ( + log = logging.LoggerForModule() + certsDescription = "local scanner credentials" + _ certRefresher = (*certRefresherImpl)(nil) +) + +type certRefresher interface { + Start() + Stop() +} + +func newCertRefresher(requestCertificates requestCertificatesFunc, timeout time.Duration, + backoff wait.Backoff) certRefresher { + return &certRefresherImpl{ + requestCertificates: requestCertificates, + certRefreshTimeout: timeout, + certRefreshBackoff: backoff, + } +} + +type certRefresherImpl struct { + requestCertificates requestCertificatesFunc + certRefreshTimeout time.Duration + certRefreshBackoff wait.Backoff + certRefreshTicker *concurrency.RetryTicker + certSecretsRepoImpl // FIXME to composition +} + +type requestCertificatesFunc func(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) + +func (i *certRefresherImpl) Start() { + i.certRefreshTicker = concurrency.NewRetryTicker(i.RefreshCertificates, i.certRefreshTimeout, i.certRefreshBackoff) + i.certRefreshTicker.OnTickSuccess = i.logRefreshSuccess + i.certRefreshTicker.OnTickError = i.logRefreshError + i.certRefreshTicker.Start() +} + +func (i *certRefresherImpl) Stop() { + if i.certRefreshTicker != nil { + i.certRefreshTicker.Stop() + } +} + +// RefreshCertificates determines refreshes the certificate secrets if needed, and returns the time +// until the next refresh. +// This is running in the goroutine for a refresh timer in i.certRefresher. +func (i *certRefresherImpl) RefreshCertificates(ctx context.Context) (timeToRefresh time.Duration, err error) { + secrets, fetchErr := i.getSecrets(ctx) + if fetchErr != nil { + return 0, fetchErr + } + timeToRefresh = time.Until(i.getCertRenewalTime(secrets)) + if timeToRefresh > 0 { + return timeToRefresh, nil + } + + response, requestErr := i.requestCertificates(ctx) + if requestErr != nil { + return 0, requestErr + } + if response.GetError() != nil { + return 0, errors.Errorf("central refused to issue certificates: %s", response.GetError().GetMessage()) + } + + certificates := response.GetCertificates() + if refreshErr := i.updateSecrets(ctx, certificates, secrets); refreshErr != nil { + return 0, refreshErr + } + timeToRefresh = time.Until(i.getCertRenewalTime(secrets)) + return timeToRefresh, nil +} + +func (i *certRefresherImpl) logRefreshSuccess(nextTimeToTick time.Duration) { + log.Infof("successfully refreshed %v", certsDescription) + log.Infof("%v scheduled to be refreshed in %s", certsDescription, nextTimeToTick) +} + +func (i *certRefresherImpl) logRefreshError(refreshErr error) { + log.Errorf("refreshing %s: %s", certsDescription, refreshErr) +} + +func (i *certRefresherImpl) getCertRenewalTime(secrets map[storage.ServiceType]*v1.Secret) time.Time { + return time.Now() // TODO +} + +// updateSecrets stores the certificates in the data of each corresponding secret, and then persists +// the secrets in k8s. +func (i *certRefresherImpl) updateSecrets(ctx context.Context, certificates *storage.TypedServiceCertificateSet, + secrets map[storage.ServiceType]*v1.Secret) error { + // TODO + return i.putSecrets(ctx, secrets) +} diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go new file mode 100644 index 0000000000000..ab62d7167b771 --- /dev/null +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -0,0 +1,66 @@ +package localscanner + +import ( + "context" + + "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/pkg/uuid" +) + +var ( + _ CertificateRequester = (*certRequesterSyncImpl)(nil) +) + +// CertificateRequester request a new set of local scanner certificates to central. +type CertificateRequester interface { + RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) +} + +// NewCertificateRequester creates a new CertificateRequester that communicates through +// the specified channels, and that uses a fresh request ID. +func NewCertificateRequester(msgFromSensorC chan *central.MsgFromSensor, + msgToSensorC chan *central.IssueLocalScannerCertsResponse) CertificateRequester { + return &certRequesterSyncImpl{ + requestID: uuid.NewV4().String(), + msgFromSensorC: msgFromSensorC, + msgToSensorC: msgToSensorC, + } +} + +type certRequesterSyncImpl struct { + requestID string + msgFromSensorC chan *central.MsgFromSensor + msgToSensorC chan *central.IssueLocalScannerCertsResponse +} + +func (i *certRequesterSyncImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { + msg := ¢ral.MsgFromSensor{ + Msg: ¢ral.MsgFromSensor_IssueLocalScannerCertsRequest{ + IssueLocalScannerCertsRequest: ¢ral.IssueLocalScannerCertsRequest{ + RequestId: i.requestID, + }, + }, + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case i.msgFromSensorC <- msg: + log.Debugf("request to issue local Scanner certificates sent to Central succesfully: %v", msg) + } + + var response *central.IssueLocalScannerCertsResponse + for response == nil { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case newResponse := <-i.msgToSensorC: + if newResponse.GetRequestId() != i.requestID { + log.Debugf("ignoring response with unknown request id %s", response.GetRequestId()) + } else { + response = newResponse + } + } + } + + return response, nil +} diff --git a/sensor/kubernetes/localscanner/secrets_repository.go b/sensor/kubernetes/localscanner/secrets_repository.go new file mode 100644 index 0000000000000..39b59c8ed094a --- /dev/null +++ b/sensor/kubernetes/localscanner/secrets_repository.go @@ -0,0 +1,30 @@ +package localscanner + +import ( + "context" + + "github.com/stackrox/rox/generated/storage" + v1 "k8s.io/api/core/v1" +) + +var ( + _ certSecretsRepo = (*certSecretsRepoImpl)(nil) +) + +type certSecretsRepo interface { + getSecrets(ctx context.Context) (map[storage.ServiceType]*v1.Secret, error) + putSecrets(ctx context.Context, secrets map[storage.ServiceType]*v1.Secret) error +} + +type certSecretsRepoImpl struct { + // TODO secretsClient corev1.SecretInterface +} + +func (i *certSecretsRepoImpl) getSecrets(ctx context.Context) (map[storage.ServiceType]*v1.Secret, error) { + secretsMap := make(map[storage.ServiceType]*v1.Secret, 3) + return secretsMap, nil // TODO +} + +func (i *certSecretsRepoImpl) putSecrets(ctx context.Context, secrets map[storage.ServiceType]*v1.Secret) error { + return nil // TODO +} diff --git a/sensor/kubernetes/localscanner/tls_issuer.go b/sensor/kubernetes/localscanner/tls_issuer.go new file mode 100644 index 0000000000000..53b327c71c3ca --- /dev/null +++ b/sensor/kubernetes/localscanner/tls_issuer.go @@ -0,0 +1,81 @@ +package localscanner + +import ( + "context" + "time" + + "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/pkg/centralsensor" + "github.com/stackrox/rox/sensor/common" + "k8s.io/apimachinery/pkg/util/wait" +) + +var ( + _ common.SensorComponent = (*localScannerTLSIssuerImpl)(nil) +) + +// NewLocalScannerTLSIssuer creates a sensor component that will keep the local scanner certificates +// fresh, using the specified retry parameters. +func NewLocalScannerTLSIssuer(certRefreshTimeout time.Duration, certRefreshBackoff wait.Backoff) common.SensorComponent { + msgFromSensorC := make(msgFromSensorChan) + msgToSensorC := make(msgToSensorChan) + requestCertificates := func(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { + certRequester := NewCertificateRequester(msgFromSensorC, msgToSensorC) + return certRequester.RequestCertificates(ctx) + } + return &localScannerTLSIssuerImpl{ + msgFromSensorC: msgFromSensorC, + msgToSensorC: msgToSensorC, + certRefresher: newCertRefresher(requestCertificates, certRefreshTimeout, certRefreshBackoff), + } +} + +type msgFromSensorChan chan *central.MsgFromSensor +type msgToSensorChan chan *central.IssueLocalScannerCertsResponse +type localScannerTLSIssuerImpl struct { + msgFromSensorC msgFromSensorChan + msgToSensorC msgToSensorChan + certRefresher certRefresher +} + +func (i *localScannerTLSIssuerImpl) Start() error { + log.Info("starting local scanner TLS issuer.") + i.certRefresher.Start() + log.Info("local scanner TLS issuer started.") + + return nil +} + +func (i *localScannerTLSIssuerImpl) Stop(err error) { + i.certRefresher.Stop() + log.Info("local scanner TLS issuer stopped.") +} + +func (i *localScannerTLSIssuerImpl) Capabilities() []centralsensor.SensorCapability { + return []centralsensor.SensorCapability{centralsensor.LocalScannerCredentialsRefresh} +} + +// ResponsesC is called "responses" because for other SensorComponent it is central that +// initiates the interaction. However, here it is sensor which sends a request to central. +func (i *localScannerTLSIssuerImpl) ResponsesC() <-chan *central.MsgFromSensor { + return i.msgFromSensorC +} + +// ProcessMessage is how the central receiver delivers messages from central to SensorComponents. +// This method must not block as it would prevent centralReceiverImpl from sending messages +// to other SensorComponents. +func (i *localScannerTLSIssuerImpl) ProcessMessage(msg *central.MsgToSensor) error { + switch m := msg.GetMsg().(type) { + case *central.MsgToSensor_IssueLocalScannerCertsResponse: + response := m.IssueLocalScannerCertsResponse + go func() { + // will block if i.resultC is filled. + i.msgToSensorC <- response + }() + return nil + default: + // silently ignore other messages broadcasted by centralReceiverImpl, as centralReceiverImpl logs + // all returned errors with error level. + return nil + } +}