From cde681bef7684b170b09eea8697f3d534a9506f2 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Fri, 21 Jan 2022 19:02:01 +0100 Subject: [PATCH 01/13] Draft TLSIssuer as a sensor component and certificate source That uses CertRefreher to schedule refresh --- sensor/kubernetes/localscanner/tls_issuer.go | 208 +++++++++++++++++++ 1 file changed, 208 insertions(+) create mode 100644 sensor/kubernetes/localscanner/tls_issuer.go diff --git a/sensor/kubernetes/localscanner/tls_issuer.go b/sensor/kubernetes/localscanner/tls_issuer.go new file mode 100644 index 0000000000000..37334cf17d57f --- /dev/null +++ b/sensor/kubernetes/localscanner/tls_issuer.go @@ -0,0 +1,208 @@ +package localscanner + +import ( + "context" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/generated/storage" + "github.com/stackrox/rox/pkg/centralsensor" + "github.com/stackrox/rox/pkg/logging" + "github.com/stackrox/rox/pkg/retry" + "github.com/stackrox/rox/pkg/uuid" + "github.com/stackrox/rox/sensor/common" + "github.com/stackrox/rox/sensor/kubernetes/certificates" + "k8s.io/apimachinery/pkg/util/wait" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" +) + +var ( + log = logging.LoggerForModule() + _ common.SensorComponent = (*localScannerTLSIssuerImpl)(nil) + _ certificates.CertificateSource = (*localScannerTLSIssuerImpl)(nil) +) + +// FIXME separate files for different structs +type localScannerTLSIssuerImpl struct { + conf config + certRefresher certificates.CertRefresher + certificateSourceImpl + sensorComponentImpl +} + +type config struct { + sensorNamespace string + secretsClient corev1.SecretInterface +} + +type certificateSourceImpl struct { + requestID string + resultC chan *retry.Result + // protects both requestID and resultC + certSourceStateMutex sync.Mutex + sensorComponentImpl +} + +type sensorComponentImpl struct { + requestsC chan *central.MsgFromSensor +} + +/* +TODO create function + resultC = make(chan *retry.Result) +*/ + +func (i *localScannerTLSIssuerImpl) Start() error { + log.Info("starting local scanner TLS issuer.") + + var certRequestBackoff wait.Backoff // FIXME + i.certRefresher = certificates.NewCertRefresher("FIXME desc", i, certRequestBackoff) + i.certRefresher.Start(context.Background()) + + log.Info("local scanner TLS issuer started.") + + return nil +} + +func (i *localScannerTLSIssuerImpl) Stop(err error) { + if i.certRefresher != nil { + i.certRefresher.Stop() + } + i.Close() + log.Info("local scanner TLS issuer stopped.") +} + +func (i *sensorComponentImpl) Capabilities() []centralsensor.SensorCapability { + return []centralsensor.SensorCapability{} // FIXME +} + +// ResponsesC is called "responses" because for other SensorComponent it is central that +// initiates the interaction. However, here it is sensor which sends a request to central. +func (i *sensorComponentImpl) ResponsesC() <-chan *central.MsgFromSensor { + return i.requestsC +} + +// ProcessMessage cannot block as it would prevent centralReceiverImpl from sending messages +// to other SensorComponent. +func (i *localScannerTLSIssuerImpl) ProcessMessage(msg *central.MsgToSensor) error { + switch m := msg.GetMsg().(type) { + case *central.MsgToSensor_IssueLocalScannerCertsResponse: + response := m.IssueLocalScannerCertsResponse + go func() { + i.processIssueLocalScannerCertsResponse(response) + }() + return nil + default: + // silently ignore other messages broadcasted by centralReceiverImpl, as centralReceiverImpl logs + // all returned errors with error level. + return nil + } +} + +func (i *certificateSourceImpl) processIssueLocalScannerCertsResponse(response *central.IssueLocalScannerCertsResponse) { + i.certSourceStateMutex.Lock() + defer i.certSourceStateMutex.Unlock() + + if response.GetRequestId() != i.requestID { + log.Debugf("ignoring response with unknown request id %s", response.GetRequestId()) + return + } + i.requestID = "" + var result *retry.Result + if response.GetError() != nil { + result = &retry.Result{Err: errors.Errorf("server side error: %s", response.GetError().GetMessage())} + } else { + // retry.Result is untyped, so at least type here. + var certificates *storage.TypedServiceCertificateSet + certificates = response.GetCertificates() + result = &retry.Result{ V: certificates } + } + resultC := i.resultC + go func() { + // can block if i.resultC is filled. + resultC <- result + }() +} + +func (i *certificateSourceImpl) AskForResult(ctx context.Context) <-chan *retry.Result { + resultC := i.resetCertSource() + go func() { + i.Retry() + }() + + return resultC +} + +// Retry blocks until the message is sent or we get a timeout. +func (i *certificateSourceImpl) Retry() { + i.certSourceStateMutex.Lock() + defer i.certSourceStateMutex.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) // FIXME timeout and context, or retry with backoff in a goroutine + defer cancel() + + requestID := uuid.NewV4().String() + msg := ¢ral.MsgFromSensor{ + Msg: ¢ral.MsgFromSensor_IssueLocalScannerCertsRequest{ + IssueLocalScannerCertsRequest: ¢ral.IssueLocalScannerCertsRequest{ + RequestId: requestID, + }, + }, + } + select { + case i.requestsC <- msg: + i.requestID = requestID + log.Debugf("request to issue local Scanner certificates sent to Central succesfully: %v", msg) + case <-ctx.Done(): + i.requestID = "" + resultC := i.resultC + go func() { + // can block if i.resultC is filled. + resultC <- &retry.Result{ Err: errors.Wrap(ctx.Err(), "sending the request to central") } + }() + } +} + +func (i *certificateSourceImpl) Close() { + i.certSourceStateMutex.Lock() + defer i.certSourceStateMutex.Unlock() + + oldResultC := i.resultC + i.doResetCertSource() + go func() { + if oldResultC != nil { + // drain channel in case the reader gave up, to avoid + // zombie goroutines. + for { + select { + case <-oldResultC: + default: + break + } + } + } + }() +} + +func (i *certificateSourceImpl) HandleCertificates(certificates *storage.TypedServiceCertificateSet) (timeToRefresh time.Duration, err error) { + // TODO get secrets => secretRepository type + if certificates != nil { + // TODO update and store secrets => secretRepository type + } + // TODO get duration from secrets => secretExpirationStrategy type + return time.Minute, nil // FIXME +} + +func (i *certificateSourceImpl) resetCertSource() chan *retry.Result { + i.certSourceStateMutex.Lock() + defer i.certSourceStateMutex.Unlock() + + i.doResetCertSource() + return i.resultC +} +func (i *certificateSourceImpl) doResetCertSource() { + i.requestID = "" + i.resultC = make(chan *retry.Result) +} From 67fe75e34b0553ec147bd015fcb76c753d82eb11 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Fri, 21 Jan 2022 19:08:32 +0100 Subject: [PATCH 02/13] Fix style --- sensor/kubernetes/localscanner/tls_issuer.go | 38 ++++++++++---------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/sensor/kubernetes/localscanner/tls_issuer.go b/sensor/kubernetes/localscanner/tls_issuer.go index 37334cf17d57f..90140a5573dab 100644 --- a/sensor/kubernetes/localscanner/tls_issuer.go +++ b/sensor/kubernetes/localscanner/tls_issuer.go @@ -15,31 +15,33 @@ import ( "github.com/stackrox/rox/sensor/common" "github.com/stackrox/rox/sensor/kubernetes/certificates" "k8s.io/apimachinery/pkg/util/wait" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" ) var ( - log = logging.LoggerForModule() - _ common.SensorComponent = (*localScannerTLSIssuerImpl)(nil) - _ certificates.CertificateSource = (*localScannerTLSIssuerImpl)(nil) + log = logging.LoggerForModule() + _ common.SensorComponent = (*localScannerTLSIssuerImpl)(nil) + _ certificates.CertificateSource = (*localScannerTLSIssuerImpl)(nil) ) // FIXME separate files for different structs type localScannerTLSIssuerImpl struct { - conf config + // TODO for HandleCertificates: conf config certRefresher certificates.CertRefresher certificateSourceImpl sensorComponentImpl } +/* +TODO type config struct { - sensorNamespace string - secretsClient corev1.SecretInterface + sensorNamespace string + secretsClient corev1.SecretInterface } +*/ type certificateSourceImpl struct { requestID string - resultC chan *retry.Result + resultC chan *retry.Result // protects both requestID and resultC certSourceStateMutex sync.Mutex sensorComponentImpl @@ -59,7 +61,9 @@ func (i *localScannerTLSIssuerImpl) Start() error { var certRequestBackoff wait.Backoff // FIXME i.certRefresher = certificates.NewCertRefresher("FIXME desc", i, certRequestBackoff) - i.certRefresher.Start(context.Background()) + if err := i.certRefresher.Start(context.Background()); err != nil { + return err + } log.Info("local scanner TLS issuer started.") @@ -114,10 +118,7 @@ func (i *certificateSourceImpl) processIssueLocalScannerCertsResponse(response * if response.GetError() != nil { result = &retry.Result{Err: errors.Errorf("server side error: %s", response.GetError().GetMessage())} } else { - // retry.Result is untyped, so at least type here. - var certificates *storage.TypedServiceCertificateSet - certificates = response.GetCertificates() - result = &retry.Result{ V: certificates } + result = &retry.Result{V: response.GetCertificates()} } resultC := i.resultC go func() { @@ -160,7 +161,7 @@ func (i *certificateSourceImpl) Retry() { resultC := i.resultC go func() { // can block if i.resultC is filled. - resultC <- &retry.Result{ Err: errors.Wrap(ctx.Err(), "sending the request to central") } + resultC <- &retry.Result{Err: errors.Wrap(ctx.Err(), "sending the request to central")} }() } } @@ -179,7 +180,7 @@ func (i *certificateSourceImpl) Close() { select { case <-oldResultC: default: - break + return } } } @@ -188,9 +189,10 @@ func (i *certificateSourceImpl) Close() { func (i *certificateSourceImpl) HandleCertificates(certificates *storage.TypedServiceCertificateSet) (timeToRefresh time.Duration, err error) { // TODO get secrets => secretRepository type - if certificates != nil { - // TODO update and store secrets => secretRepository type - } + /* + if certificates != nil { + // TODO update and store secrets => secretRepository type + }*/ // TODO get duration from secrets => secretExpirationStrategy type return time.Minute, nil // FIXME } From ece084a1a77fabede54170e78e64576a9574eadd Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Mon, 24 Jan 2022 19:56:27 +0100 Subject: [PATCH 03/13] simplify TLS issuer --- sensor/kubernetes/localscanner/tls_issuer.go | 181 ++++++++----------- 1 file changed, 79 insertions(+), 102 deletions(-) diff --git a/sensor/kubernetes/localscanner/tls_issuer.go b/sensor/kubernetes/localscanner/tls_issuer.go index 90140a5573dab..3e9bc5d90c9d0 100644 --- a/sensor/kubernetes/localscanner/tls_issuer.go +++ b/sensor/kubernetes/localscanner/tls_issuer.go @@ -2,7 +2,6 @@ package localscanner import ( "context" - "sync" "time" "github.com/pkg/errors" @@ -10,57 +9,55 @@ import ( "github.com/stackrox/rox/generated/storage" "github.com/stackrox/rox/pkg/centralsensor" "github.com/stackrox/rox/pkg/logging" - "github.com/stackrox/rox/pkg/retry" "github.com/stackrox/rox/pkg/uuid" "github.com/stackrox/rox/sensor/common" "github.com/stackrox/rox/sensor/kubernetes/certificates" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" ) var ( log = logging.LoggerForModule() _ common.SensorComponent = (*localScannerTLSIssuerImpl)(nil) - _ certificates.CertificateSource = (*localScannerTLSIssuerImpl)(nil) + _ certificates.CertificateIssuer = (*localScannerTLSIssuerImpl)(nil) ) // FIXME separate files for different structs type localScannerTLSIssuerImpl struct { - // TODO for HandleCertificates: conf config + conf config certRefresher certificates.CertRefresher - certificateSourceImpl + certIssuerImpl sensorComponentImpl } -/* -TODO type config struct { - sensorNamespace string - secretsClient corev1.SecretInterface + // TODO sensorNamespace string + // TODO secretsClient corev1.SecretInterface + certRefresherBackoff wait.Backoff } -*/ -type certificateSourceImpl struct { - requestID string - resultC chan *retry.Result - // protects both requestID and resultC - certSourceStateMutex sync.Mutex +type sensorComponentImpl struct { + requestsC chan *central.MsgFromSensor + responsesC chan *central.IssueLocalScannerCertsResponse +} + +type certIssuerImpl struct { sensorComponentImpl + certSecretsRepoImpl } -type sensorComponentImpl struct { - requestsC chan *central.MsgFromSensor +type certSyncRequesterImpl struct { + requestID string + requestsC chan *central.MsgFromSensor + responsesC chan *central.IssueLocalScannerCertsResponse } -/* -TODO create function - resultC = make(chan *retry.Result) -*/ +type certSecretsRepoImpl struct{} func (i *localScannerTLSIssuerImpl) Start() error { log.Info("starting local scanner TLS issuer.") - var certRequestBackoff wait.Backoff // FIXME - i.certRefresher = certificates.NewCertRefresher("FIXME desc", i, certRequestBackoff) + i.certRefresher = certificates.NewCertRefresher("FIXME desc", i, i.conf.certRefresherBackoff) if err := i.certRefresher.Start(context.Background()); err != nil { return err } @@ -74,7 +71,6 @@ func (i *localScannerTLSIssuerImpl) Stop(err error) { if i.certRefresher != nil { i.certRefresher.Stop() } - i.Close() log.Info("local scanner TLS issuer stopped.") } @@ -89,13 +85,14 @@ func (i *sensorComponentImpl) ResponsesC() <-chan *central.MsgFromSensor { } // ProcessMessage cannot block as it would prevent centralReceiverImpl from sending messages -// to other SensorComponent. +// to other SensorComponent. This is running in the goroutine launched in centralReceiverImpl.Start. func (i *localScannerTLSIssuerImpl) ProcessMessage(msg *central.MsgToSensor) error { switch m := msg.GetMsg().(type) { case *central.MsgToSensor_IssueLocalScannerCertsResponse: response := m.IssueLocalScannerCertsResponse go func() { - i.processIssueLocalScannerCertsResponse(response) + // will block if i.resultC is filled. + i.responsesC <- response }() return nil default: @@ -105,106 +102,86 @@ func (i *localScannerTLSIssuerImpl) ProcessMessage(msg *central.MsgToSensor) err } } -func (i *certificateSourceImpl) processIssueLocalScannerCertsResponse(response *central.IssueLocalScannerCertsResponse) { - i.certSourceStateMutex.Lock() - defer i.certSourceStateMutex.Unlock() +// RefreshCertificates TODO doc +// This is running in the goroutine for a refresh timer in i.certRefresher. +func (i *certIssuerImpl) RefreshCertificates(ctx context.Context) (timeToRefresh time.Duration, err error) { + secrets, fetchErr := i.fetchSecrets() + if fetchErr != nil { + return 0, fetchErr + } + timeToRefresh = time.Until(i.getCertRenewalTime(secrets)) + if timeToRefresh > 0 { + return timeToRefresh, nil + } - if response.GetRequestId() != i.requestID { - log.Debugf("ignoring response with unknown request id %s", response.GetRequestId()) - return + certRequester := &certSyncRequesterImpl{ + requestID: uuid.NewV4().String(), + requestsC: i.requestsC, + responsesC: i.responsesC, + } + response, requestErr := certRequester.requestCertificates(ctx) + if requestErr != nil { + return 0, requestErr } - i.requestID = "" - var result *retry.Result if response.GetError() != nil { - result = &retry.Result{Err: errors.Errorf("server side error: %s", response.GetError().GetMessage())} - } else { - result = &retry.Result{V: response.GetCertificates()} + return 0, errors.Errorf("server side error: %s", response.GetError().GetMessage()) } - resultC := i.resultC - go func() { - // can block if i.resultC is filled. - resultC <- result - }() -} - -func (i *certificateSourceImpl) AskForResult(ctx context.Context) <-chan *retry.Result { - resultC := i.resetCertSource() - go func() { - i.Retry() - }() - return resultC + certificates := response.GetCertificates() + if refreshErr := i.refreshSecrets(certificates, secrets); refreshErr != nil { + return 0, refreshErr + } + timeToRefresh = time.Until(i.getCertRenewalTime(secrets)) + return timeToRefresh, nil } -// Retry blocks until the message is sent or we get a timeout. -func (i *certificateSourceImpl) Retry() { - i.certSourceStateMutex.Lock() - defer i.certSourceStateMutex.Unlock() +func (i *certIssuerImpl) getCertRenewalTime(secrets map[storage.ServiceType]*v1.Secret) time.Time { + return time.Now() // TODO +} - ctx, cancel := context.WithTimeout(context.Background(), time.Second) // FIXME timeout and context, or retry with backoff in a goroutine - defer cancel() +func (i *certIssuerImpl) refreshSecrets(certificates *storage.TypedServiceCertificateSet, + secrets map[storage.ServiceType]*v1.Secret) error { + // TODO + return i.updateSecrets(secrets) +} - requestID := uuid.NewV4().String() +func (i *certSyncRequesterImpl) requestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { msg := ¢ral.MsgFromSensor{ Msg: ¢ral.MsgFromSensor_IssueLocalScannerCertsRequest{ IssueLocalScannerCertsRequest: ¢ral.IssueLocalScannerCertsRequest{ - RequestId: requestID, + RequestId: i.requestID, }, }, } select { + case <-ctx.Done(): + return nil, ctx.Err() case i.requestsC <- msg: - i.requestID = requestID log.Debugf("request to issue local Scanner certificates sent to Central succesfully: %v", msg) - case <-ctx.Done(): - i.requestID = "" - resultC := i.resultC - go func() { - // can block if i.resultC is filled. - resultC <- &retry.Result{Err: errors.Wrap(ctx.Err(), "sending the request to central")} - }() } -} -func (i *certificateSourceImpl) Close() { - i.certSourceStateMutex.Lock() - defer i.certSourceStateMutex.Unlock() - - oldResultC := i.resultC - i.doResetCertSource() - go func() { - if oldResultC != nil { - // drain channel in case the reader gave up, to avoid - // zombie goroutines. - for { - select { - case <-oldResultC: - default: - return - } + var response *central.IssueLocalScannerCertsResponse + for response == nil { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case newResponse := <-i.responsesC: + if response.GetRequestId() != i.requestID { + log.Debugf("ignoring response with unknown request id %s", response.GetRequestId()) + } else { + response = newResponse } } - }() -} + } -func (i *certificateSourceImpl) HandleCertificates(certificates *storage.TypedServiceCertificateSet) (timeToRefresh time.Duration, err error) { - // TODO get secrets => secretRepository type - /* - if certificates != nil { - // TODO update and store secrets => secretRepository type - }*/ - // TODO get duration from secrets => secretExpirationStrategy type - return time.Minute, nil // FIXME + return response, nil } -func (i *certificateSourceImpl) resetCertSource() chan *retry.Result { - i.certSourceStateMutex.Lock() - defer i.certSourceStateMutex.Unlock() - - i.doResetCertSource() - return i.resultC +func (i *certSecretsRepoImpl) fetchSecrets() (map[storage.ServiceType]*v1.Secret, error) { + secretsMap := make(map[storage.ServiceType]*v1.Secret, 3) + return secretsMap, nil // TODO } -func (i *certificateSourceImpl) doResetCertSource() { - i.requestID = "" - i.resultC = make(chan *retry.Result) + +func (i *certSecretsRepoImpl) updateSecrets(secrets map[storage.ServiceType]*v1.Secret) error { + return nil // TODO } From ac3d87b09294dad671c22d1502d59eb911dd3100 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Mon, 24 Jan 2022 19:56:51 +0100 Subject: [PATCH 04/13] remove unused code From a10b8051571cc009e99ec7344799c637eaf3812e Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Mon, 24 Jan 2022 20:18:49 +0100 Subject: [PATCH 05/13] fix typo bug and minor cleanup --- sensor/kubernetes/localscanner/tls_issuer.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sensor/kubernetes/localscanner/tls_issuer.go b/sensor/kubernetes/localscanner/tls_issuer.go index 3e9bc5d90c9d0..8fc8150c63ffc 100644 --- a/sensor/kubernetes/localscanner/tls_issuer.go +++ b/sensor/kubernetes/localscanner/tls_issuer.go @@ -32,7 +32,6 @@ type localScannerTLSIssuerImpl struct { type config struct { // TODO sensorNamespace string - // TODO secretsClient corev1.SecretInterface certRefresherBackoff wait.Backoff } @@ -52,7 +51,9 @@ type certSyncRequesterImpl struct { responsesC chan *central.IssueLocalScannerCertsResponse } -type certSecretsRepoImpl struct{} +type certSecretsRepoImpl struct { + // TODO secretsClient corev1.SecretInterface +} func (i *localScannerTLSIssuerImpl) Start() error { log.Info("starting local scanner TLS issuer.") @@ -166,7 +167,7 @@ func (i *certSyncRequesterImpl) requestCertificates(ctx context.Context) (*centr case <-ctx.Done(): return nil, ctx.Err() case newResponse := <-i.responsesC: - if response.GetRequestId() != i.requestID { + if newResponse.GetRequestId() != i.requestID { log.Debugf("ignoring response with unknown request id %s", response.GetRequestId()) } else { response = newResponse From 9ca0366b91f673d708387cc88d07c19fbc0d5bf7 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 25 Jan 2022 11:59:14 +0100 Subject: [PATCH 06/13] remove context param of Start method in CertRefresher --- sensor/kubernetes/localscanner/tls_issuer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sensor/kubernetes/localscanner/tls_issuer.go b/sensor/kubernetes/localscanner/tls_issuer.go index 8fc8150c63ffc..c8edae28669a5 100644 --- a/sensor/kubernetes/localscanner/tls_issuer.go +++ b/sensor/kubernetes/localscanner/tls_issuer.go @@ -59,7 +59,7 @@ func (i *localScannerTLSIssuerImpl) Start() error { log.Info("starting local scanner TLS issuer.") i.certRefresher = certificates.NewCertRefresher("FIXME desc", i, i.conf.certRefresherBackoff) - if err := i.certRefresher.Start(context.Background()); err != nil { + if err := i.certRefresher.Start(); err != nil { return err } From fe511bb3801e424d5cb4dcfc56786d47312882b1 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 25 Jan 2022 12:38:53 +0100 Subject: [PATCH 07/13] pass context to all subcalls in RefreshCertificates --- sensor/kubernetes/localscanner/tls_issuer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sensor/kubernetes/localscanner/tls_issuer.go b/sensor/kubernetes/localscanner/tls_issuer.go index c8edae28669a5..917a8db680fc4 100644 --- a/sensor/kubernetes/localscanner/tls_issuer.go +++ b/sensor/kubernetes/localscanner/tls_issuer.go @@ -106,7 +106,7 @@ func (i *localScannerTLSIssuerImpl) ProcessMessage(msg *central.MsgToSensor) err // RefreshCertificates TODO doc // This is running in the goroutine for a refresh timer in i.certRefresher. func (i *certIssuerImpl) RefreshCertificates(ctx context.Context) (timeToRefresh time.Duration, err error) { - secrets, fetchErr := i.fetchSecrets() + secrets, fetchErr := i.fetchSecrets(ctx) if fetchErr != nil { return 0, fetchErr } @@ -129,7 +129,7 @@ func (i *certIssuerImpl) RefreshCertificates(ctx context.Context) (timeToRefresh } certificates := response.GetCertificates() - if refreshErr := i.refreshSecrets(certificates, secrets); refreshErr != nil { + if refreshErr := i.refreshSecrets(ctx, certificates, secrets); refreshErr != nil { return 0, refreshErr } timeToRefresh = time.Until(i.getCertRenewalTime(secrets)) @@ -140,7 +140,7 @@ func (i *certIssuerImpl) getCertRenewalTime(secrets map[storage.ServiceType]*v1. return time.Now() // TODO } -func (i *certIssuerImpl) refreshSecrets(certificates *storage.TypedServiceCertificateSet, +func (i *certIssuerImpl) refreshSecrets(ctx context.Context, certificates *storage.TypedServiceCertificateSet, secrets map[storage.ServiceType]*v1.Secret) error { // TODO return i.updateSecrets(secrets) @@ -178,7 +178,7 @@ func (i *certSyncRequesterImpl) requestCertificates(ctx context.Context) (*centr return response, nil } -func (i *certSecretsRepoImpl) fetchSecrets() (map[storage.ServiceType]*v1.Secret, error) { +func (i *certSecretsRepoImpl) fetchSecrets(ctx context.Context) (map[storage.ServiceType]*v1.Secret, error) { secretsMap := make(map[storage.ServiceType]*v1.Secret, 3) return secretsMap, nil // TODO } From dac9bb639fac4d6cb82540219745db1b2a45cc90 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 25 Jan 2022 12:47:33 +0100 Subject: [PATCH 08/13] rename requestsC and responsesC to msgFromSensorC and msgToSensorC --- sensor/kubernetes/localscanner/tls_issuer.go | 24 ++++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sensor/kubernetes/localscanner/tls_issuer.go b/sensor/kubernetes/localscanner/tls_issuer.go index 917a8db680fc4..72c708f86053a 100644 --- a/sensor/kubernetes/localscanner/tls_issuer.go +++ b/sensor/kubernetes/localscanner/tls_issuer.go @@ -36,8 +36,8 @@ type config struct { } type sensorComponentImpl struct { - requestsC chan *central.MsgFromSensor - responsesC chan *central.IssueLocalScannerCertsResponse + msgFromSensorC chan *central.MsgFromSensor + msgToSensorC chan *central.IssueLocalScannerCertsResponse } type certIssuerImpl struct { @@ -46,9 +46,9 @@ type certIssuerImpl struct { } type certSyncRequesterImpl struct { - requestID string - requestsC chan *central.MsgFromSensor - responsesC chan *central.IssueLocalScannerCertsResponse + requestID string + msgFromSensorC chan *central.MsgFromSensor + msgToSensorC chan *central.IssueLocalScannerCertsResponse } type certSecretsRepoImpl struct { @@ -82,7 +82,7 @@ func (i *sensorComponentImpl) Capabilities() []centralsensor.SensorCapability { // ResponsesC is called "responses" because for other SensorComponent it is central that // initiates the interaction. However, here it is sensor which sends a request to central. func (i *sensorComponentImpl) ResponsesC() <-chan *central.MsgFromSensor { - return i.requestsC + return i.msgFromSensorC } // ProcessMessage cannot block as it would prevent centralReceiverImpl from sending messages @@ -93,7 +93,7 @@ func (i *localScannerTLSIssuerImpl) ProcessMessage(msg *central.MsgToSensor) err response := m.IssueLocalScannerCertsResponse go func() { // will block if i.resultC is filled. - i.responsesC <- response + i.msgToSensorC <- response }() return nil default: @@ -116,9 +116,9 @@ func (i *certIssuerImpl) RefreshCertificates(ctx context.Context) (timeToRefresh } certRequester := &certSyncRequesterImpl{ - requestID: uuid.NewV4().String(), - requestsC: i.requestsC, - responsesC: i.responsesC, + requestID: uuid.NewV4().String(), + msgFromSensorC: i.msgFromSensorC, + msgToSensorC: i.msgToSensorC, } response, requestErr := certRequester.requestCertificates(ctx) if requestErr != nil { @@ -157,7 +157,7 @@ func (i *certSyncRequesterImpl) requestCertificates(ctx context.Context) (*centr select { case <-ctx.Done(): return nil, ctx.Err() - case i.requestsC <- msg: + case i.msgFromSensorC <- msg: log.Debugf("request to issue local Scanner certificates sent to Central succesfully: %v", msg) } @@ -166,7 +166,7 @@ func (i *certSyncRequesterImpl) requestCertificates(ctx context.Context) (*centr select { case <-ctx.Done(): return nil, ctx.Err() - case newResponse := <-i.responsesC: + case newResponse := <-i.msgToSensorC: if newResponse.GetRequestId() != i.requestID { log.Debugf("ignoring response with unknown request id %s", response.GetRequestId()) } else { From 32f8cd18761cf7156870f6538c505dae13207688 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 25 Jan 2022 12:50:08 +0100 Subject: [PATCH 09/13] improve comments and logging --- sensor/kubernetes/localscanner/tls_issuer.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sensor/kubernetes/localscanner/tls_issuer.go b/sensor/kubernetes/localscanner/tls_issuer.go index 72c708f86053a..ba1647ab98cd2 100644 --- a/sensor/kubernetes/localscanner/tls_issuer.go +++ b/sensor/kubernetes/localscanner/tls_issuer.go @@ -85,8 +85,9 @@ func (i *sensorComponentImpl) ResponsesC() <-chan *central.MsgFromSensor { return i.msgFromSensorC } -// ProcessMessage cannot block as it would prevent centralReceiverImpl from sending messages -// to other SensorComponent. This is running in the goroutine launched in centralReceiverImpl.Start. +// ProcessMessage is how the central receiver delivers messages from central to SensorComponents. +// This method must not block as it would prevent centralReceiverImpl from sending messages +// to other SensorComponents. func (i *localScannerTLSIssuerImpl) ProcessMessage(msg *central.MsgToSensor) error { switch m := msg.GetMsg().(type) { case *central.MsgToSensor_IssueLocalScannerCertsResponse: @@ -125,7 +126,7 @@ func (i *certIssuerImpl) RefreshCertificates(ctx context.Context) (timeToRefresh return 0, requestErr } if response.GetError() != nil { - return 0, errors.Errorf("server side error: %s", response.GetError().GetMessage()) + return 0, errors.Errorf("central refused to issue certificates: %s", response.GetError().GetMessage()) } certificates := response.GetCertificates() From 44d7d14bec10d49fbe02b0378fa382e8f50a3eb3 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 25 Jan 2022 13:04:32 +0100 Subject: [PATCH 10/13] improve naming of repo operation vs secret refresh --- sensor/kubernetes/localscanner/tls_issuer.go | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/sensor/kubernetes/localscanner/tls_issuer.go b/sensor/kubernetes/localscanner/tls_issuer.go index ba1647ab98cd2..2e355b908265d 100644 --- a/sensor/kubernetes/localscanner/tls_issuer.go +++ b/sensor/kubernetes/localscanner/tls_issuer.go @@ -20,6 +20,8 @@ var ( log = logging.LoggerForModule() _ common.SensorComponent = (*localScannerTLSIssuerImpl)(nil) _ certificates.CertificateIssuer = (*localScannerTLSIssuerImpl)(nil) + _ certificates.CertificateIssuer = (*certIssuerImpl)(nil) + _ certSecretsRepo = (*certSecretsRepoImpl)(nil) ) // FIXME separate files for different structs @@ -51,6 +53,11 @@ type certSyncRequesterImpl struct { msgToSensorC chan *central.IssueLocalScannerCertsResponse } +type certSecretsRepo interface { + getSecrets(ctx context.Context) (map[storage.ServiceType]*v1.Secret, error) + putSecrets(ctx context.Context, secrets map[storage.ServiceType]*v1.Secret) error +} + type certSecretsRepoImpl struct { // TODO secretsClient corev1.SecretInterface } @@ -107,7 +114,7 @@ func (i *localScannerTLSIssuerImpl) ProcessMessage(msg *central.MsgToSensor) err // RefreshCertificates TODO doc // This is running in the goroutine for a refresh timer in i.certRefresher. func (i *certIssuerImpl) RefreshCertificates(ctx context.Context) (timeToRefresh time.Duration, err error) { - secrets, fetchErr := i.fetchSecrets(ctx) + secrets, fetchErr := i.getSecrets(ctx) if fetchErr != nil { return 0, fetchErr } @@ -130,7 +137,7 @@ func (i *certIssuerImpl) RefreshCertificates(ctx context.Context) (timeToRefresh } certificates := response.GetCertificates() - if refreshErr := i.refreshSecrets(ctx, certificates, secrets); refreshErr != nil { + if refreshErr := i.updateSecrets(ctx, certificates, secrets); refreshErr != nil { return 0, refreshErr } timeToRefresh = time.Until(i.getCertRenewalTime(secrets)) @@ -141,10 +148,12 @@ func (i *certIssuerImpl) getCertRenewalTime(secrets map[storage.ServiceType]*v1. return time.Now() // TODO } -func (i *certIssuerImpl) refreshSecrets(ctx context.Context, certificates *storage.TypedServiceCertificateSet, +// updateSecrets stores the certificates in the data of each corresponding secret, and then persists +// the secrets in k8s. +func (i *certIssuerImpl) updateSecrets(ctx context.Context, certificates *storage.TypedServiceCertificateSet, secrets map[storage.ServiceType]*v1.Secret) error { // TODO - return i.updateSecrets(secrets) + return i.putSecrets(ctx, secrets) } func (i *certSyncRequesterImpl) requestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { @@ -179,11 +188,11 @@ func (i *certSyncRequesterImpl) requestCertificates(ctx context.Context) (*centr return response, nil } -func (i *certSecretsRepoImpl) fetchSecrets(ctx context.Context) (map[storage.ServiceType]*v1.Secret, error) { +func (i *certSecretsRepoImpl) getSecrets(ctx context.Context) (map[storage.ServiceType]*v1.Secret, error) { secretsMap := make(map[storage.ServiceType]*v1.Secret, 3) return secretsMap, nil // TODO } -func (i *certSecretsRepoImpl) updateSecrets(secrets map[storage.ServiceType]*v1.Secret) error { +func (i *certSecretsRepoImpl) putSecrets(ctx context.Context, secrets map[storage.ServiceType]*v1.Secret) error { return nil // TODO } From cfd3652469251d51c87ec960b96770b77774b6bc Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 25 Jan 2022 18:49:38 +0100 Subject: [PATCH 11/13] use RetryTicker --- sensor/kubernetes/localscanner/tls_issuer.go | 32 ++++++++++++-------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/sensor/kubernetes/localscanner/tls_issuer.go b/sensor/kubernetes/localscanner/tls_issuer.go index 2e355b908265d..d32fa010c0183 100644 --- a/sensor/kubernetes/localscanner/tls_issuer.go +++ b/sensor/kubernetes/localscanner/tls_issuer.go @@ -8,26 +8,26 @@ import ( "github.com/stackrox/rox/generated/internalapi/central" "github.com/stackrox/rox/generated/storage" "github.com/stackrox/rox/pkg/centralsensor" + "github.com/stackrox/rox/pkg/concurrency" "github.com/stackrox/rox/pkg/logging" "github.com/stackrox/rox/pkg/uuid" "github.com/stackrox/rox/sensor/common" - "github.com/stackrox/rox/sensor/kubernetes/certificates" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" ) var ( - log = logging.LoggerForModule() - _ common.SensorComponent = (*localScannerTLSIssuerImpl)(nil) - _ certificates.CertificateIssuer = (*localScannerTLSIssuerImpl)(nil) - _ certificates.CertificateIssuer = (*certIssuerImpl)(nil) + log = logging.LoggerForModule() + _ common.SensorComponent = (*localScannerTLSIssuerImpl)(nil) + // _ certificates.CertificateIssuer = (*localScannerTLSIssuerImpl)(nil) + // _ certificates.CertificateIssuer = (*certIssuerImpl)(nil) _ certSecretsRepo = (*certSecretsRepoImpl)(nil) ) // FIXME separate files for different structs type localScannerTLSIssuerImpl struct { - conf config - certRefresher certificates.CertRefresher + conf config + certRefreshTicker *concurrency.RetryTicker certIssuerImpl sensorComponentImpl } @@ -65,10 +65,18 @@ type certSecretsRepoImpl struct { func (i *localScannerTLSIssuerImpl) Start() error { log.Info("starting local scanner TLS issuer.") - i.certRefresher = certificates.NewCertRefresher("FIXME desc", i, i.conf.certRefresherBackoff) - if err := i.certRefresher.Start(); err != nil { - return err + i.certRefreshTicker = concurrency.NewRetryTicker(i.RefreshCertificates, + time.Second, + i.conf.certRefresherBackoff) + certsDescription := "local scanner credentials" + i.certRefreshTicker.OnTickSuccess = func(nextTimeToTick time.Duration) { + log.Infof("successfully refreshed %v", certsDescription) + log.Infof("%v scheduled to be refreshed in %s", certsDescription, nextTimeToTick) } + i.certRefreshTicker.OnTickError = func(refreshErr error) { + log.Errorf("refreshing %s: %s", certsDescription, refreshErr) + } + i.certRefreshTicker.Start() log.Info("local scanner TLS issuer started.") @@ -76,8 +84,8 @@ func (i *localScannerTLSIssuerImpl) Start() error { } func (i *localScannerTLSIssuerImpl) Stop(err error) { - if i.certRefresher != nil { - i.certRefresher.Stop() + if i.certRefreshTicker != nil { + i.certRefreshTicker.Stop() } log.Info("local scanner TLS issuer stopped.") } From 24024cdd8e8a0f1b527bd5186b5f11f659ef682a Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 25 Jan 2022 19:51:37 +0100 Subject: [PATCH 12/13] refactoring WIP --- pkg/centralsensor/caps_list.go | 3 + .../kubernetes/localscanner/cert_refresher.go | 107 +++++++++++ .../localscanner/certificate_requester.go | 63 +++++++ .../localscanner/secrets_repository.go | 30 +++ sensor/kubernetes/localscanner/tls_issuer.go | 177 +++--------------- 5 files changed, 228 insertions(+), 152 deletions(-) create mode 100644 sensor/kubernetes/localscanner/cert_refresher.go create mode 100644 sensor/kubernetes/localscanner/certificate_requester.go create mode 100644 sensor/kubernetes/localscanner/secrets_repository.go diff --git a/pkg/centralsensor/caps_list.go b/pkg/centralsensor/caps_list.go index e31cd51e8a556..e0b85406eee2b 100644 --- a/pkg/centralsensor/caps_list.go +++ b/pkg/centralsensor/caps_list.go @@ -24,4 +24,7 @@ const ( // AuditLogEventsCap identifies the capability to handle audit log event detection. AuditLogEventsCap SensorCapability = "AuditLogEvents" + + // LocalScannerCredentialsRefresh identifies the capability to maintain the Local scanner TLS credentials refreshed. + LocalScannerCredentialsRefresh SensorCapability = "LocalScannerCredentialsRefresh" ) diff --git a/sensor/kubernetes/localscanner/cert_refresher.go b/sensor/kubernetes/localscanner/cert_refresher.go new file mode 100644 index 0000000000000..c7a5f4baa7426 --- /dev/null +++ b/sensor/kubernetes/localscanner/cert_refresher.go @@ -0,0 +1,107 @@ +package localscanner + +import ( + "context" + "time" + + "github.com/pkg/errors" + "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/generated/storage" + "github.com/stackrox/rox/pkg/concurrency" + "github.com/stackrox/rox/pkg/logging" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" +) + +var ( + log = logging.LoggerForModule() + certsDescription = "local scanner credentials" + _ certRefresher = (*certRefresherImpl)(nil) +) + +type certRefresher interface { + Start() + Stop() +} + +func newCertRefresher(requestCertificates requestCertificatesFunc, timeout time.Duration, + backoff wait.Backoff) certRefresher { + return &certRefresherImpl{ + requestCertificates: requestCertificates, + certRefreshTimeout: timeout, + certRefreshBackoff: backoff, + } +} + +type certRefresherImpl struct { + requestCertificates requestCertificatesFunc + certRefreshTimeout time.Duration + certRefreshBackoff wait.Backoff + certRefreshTicker *concurrency.RetryTicker + certSecretsRepoImpl // FIXME to composition +} + +type requestCertificatesFunc func(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) + +func (i *certRefresherImpl) Start() { + i.certRefreshTicker = concurrency.NewRetryTicker(i.RefreshCertificates, i.certRefreshTimeout, i.certRefreshBackoff) + i.certRefreshTicker.OnTickSuccess = i.logRefreshSuccess + i.certRefreshTicker.OnTickError = i.logRefreshError + i.certRefreshTicker.Start() +} + +func (i *certRefresherImpl) Stop() { + if i.certRefreshTicker != nil { + i.certRefreshTicker.Stop() + } +} + +// RefreshCertificates determines refreshes the certificate secrets if needed, and returns the time +// until the next refresh. +// This is running in the goroutine for a refresh timer in i.certRefresher. +func (i *certRefresherImpl) RefreshCertificates(ctx context.Context) (timeToRefresh time.Duration, err error) { + secrets, fetchErr := i.getSecrets(ctx) + if fetchErr != nil { + return 0, fetchErr + } + timeToRefresh = time.Until(i.getCertRenewalTime(secrets)) + if timeToRefresh > 0 { + return timeToRefresh, nil + } + + response, requestErr := i.requestCertificates(ctx) + if requestErr != nil { + return 0, requestErr + } + if response.GetError() != nil { + return 0, errors.Errorf("central refused to issue certificates: %s", response.GetError().GetMessage()) + } + + certificates := response.GetCertificates() + if refreshErr := i.updateSecrets(ctx, certificates, secrets); refreshErr != nil { + return 0, refreshErr + } + timeToRefresh = time.Until(i.getCertRenewalTime(secrets)) + return timeToRefresh, nil +} + +func (i *certRefresherImpl) logRefreshSuccess(nextTimeToTick time.Duration) { + log.Infof("successfully refreshed %v", certsDescription) + log.Infof("%v scheduled to be refreshed in %s", certsDescription, nextTimeToTick) +} + +func (i *certRefresherImpl) logRefreshError(refreshErr error) { + log.Errorf("refreshing %s: %s", certsDescription, refreshErr) +} + +func (i *certRefresherImpl) getCertRenewalTime(secrets map[storage.ServiceType]*v1.Secret) time.Time { + return time.Now() // TODO +} + +// updateSecrets stores the certificates in the data of each corresponding secret, and then persists +// the secrets in k8s. +func (i *certRefresherImpl) updateSecrets(ctx context.Context, certificates *storage.TypedServiceCertificateSet, + secrets map[storage.ServiceType]*v1.Secret) error { + // TODO + return i.putSecrets(ctx, secrets) +} diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go new file mode 100644 index 0000000000000..4fca34482d70f --- /dev/null +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -0,0 +1,63 @@ +package localscanner + +import ( + "context" + + "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/pkg/uuid" +) + +var ( + _ CertificateRequester = (*certRequesterSyncImpl)(nil) +) + +type CertificateRequester interface { + RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) +} + +func NewCertificateRequester(msgFromSensorC chan *central.MsgFromSensor, + msgToSensorC chan *central.IssueLocalScannerCertsResponse) CertificateRequester { + return &certRequesterSyncImpl{ + requestID: uuid.NewV4().String(), + msgFromSensorC: msgFromSensorC, + msgToSensorC: msgToSensorC, + } +} + +type certRequesterSyncImpl struct { + requestID string + msgFromSensorC chan *central.MsgFromSensor + msgToSensorC chan *central.IssueLocalScannerCertsResponse +} + +func (i *certRequesterSyncImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { + msg := ¢ral.MsgFromSensor{ + Msg: ¢ral.MsgFromSensor_IssueLocalScannerCertsRequest{ + IssueLocalScannerCertsRequest: ¢ral.IssueLocalScannerCertsRequest{ + RequestId: i.requestID, + }, + }, + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case i.msgFromSensorC <- msg: + log.Debugf("request to issue local Scanner certificates sent to Central succesfully: %v", msg) + } + + var response *central.IssueLocalScannerCertsResponse + for response == nil { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case newResponse := <-i.msgToSensorC: + if newResponse.GetRequestId() != i.requestID { + log.Debugf("ignoring response with unknown request id %s", response.GetRequestId()) + } else { + response = newResponse + } + } + } + + return response, nil +} \ No newline at end of file diff --git a/sensor/kubernetes/localscanner/secrets_repository.go b/sensor/kubernetes/localscanner/secrets_repository.go new file mode 100644 index 0000000000000..39b59c8ed094a --- /dev/null +++ b/sensor/kubernetes/localscanner/secrets_repository.go @@ -0,0 +1,30 @@ +package localscanner + +import ( + "context" + + "github.com/stackrox/rox/generated/storage" + v1 "k8s.io/api/core/v1" +) + +var ( + _ certSecretsRepo = (*certSecretsRepoImpl)(nil) +) + +type certSecretsRepo interface { + getSecrets(ctx context.Context) (map[storage.ServiceType]*v1.Secret, error) + putSecrets(ctx context.Context, secrets map[storage.ServiceType]*v1.Secret) error +} + +type certSecretsRepoImpl struct { + // TODO secretsClient corev1.SecretInterface +} + +func (i *certSecretsRepoImpl) getSecrets(ctx context.Context) (map[storage.ServiceType]*v1.Secret, error) { + secretsMap := make(map[storage.ServiceType]*v1.Secret, 3) + return secretsMap, nil // TODO +} + +func (i *certSecretsRepoImpl) putSecrets(ctx context.Context, secrets map[storage.ServiceType]*v1.Secret) error { + return nil // TODO +} diff --git a/sensor/kubernetes/localscanner/tls_issuer.go b/sensor/kubernetes/localscanner/tls_issuer.go index d32fa010c0183..5961bcd57d252 100644 --- a/sensor/kubernetes/localscanner/tls_issuer.go +++ b/sensor/kubernetes/localscanner/tls_issuer.go @@ -4,99 +4,58 @@ import ( "context" "time" - "github.com/pkg/errors" "github.com/stackrox/rox/generated/internalapi/central" - "github.com/stackrox/rox/generated/storage" "github.com/stackrox/rox/pkg/centralsensor" - "github.com/stackrox/rox/pkg/concurrency" - "github.com/stackrox/rox/pkg/logging" - "github.com/stackrox/rox/pkg/uuid" "github.com/stackrox/rox/sensor/common" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" ) var ( - log = logging.LoggerForModule() - _ common.SensorComponent = (*localScannerTLSIssuerImpl)(nil) - // _ certificates.CertificateIssuer = (*localScannerTLSIssuerImpl)(nil) - // _ certificates.CertificateIssuer = (*certIssuerImpl)(nil) - _ certSecretsRepo = (*certSecretsRepoImpl)(nil) + _ common.SensorComponent = (*localScannerTLSIssuerImpl)(nil) ) -// FIXME separate files for different structs -type localScannerTLSIssuerImpl struct { - conf config - certRefreshTicker *concurrency.RetryTicker - certIssuerImpl - sensorComponentImpl -} - -type config struct { - // TODO sensorNamespace string - certRefresherBackoff wait.Backoff -} - -type sensorComponentImpl struct { - msgFromSensorC chan *central.MsgFromSensor - msgToSensorC chan *central.IssueLocalScannerCertsResponse -} - -type certIssuerImpl struct { - sensorComponentImpl - certSecretsRepoImpl -} - -type certSyncRequesterImpl struct { - requestID string - msgFromSensorC chan *central.MsgFromSensor - msgToSensorC chan *central.IssueLocalScannerCertsResponse -} - -type certSecretsRepo interface { - getSecrets(ctx context.Context) (map[storage.ServiceType]*v1.Secret, error) - putSecrets(ctx context.Context, secrets map[storage.ServiceType]*v1.Secret) error +func NewLocalScannerTLSIssuer(certRefreshTimeout time.Duration, certRefreshBackoff wait.Backoff) common.SensorComponent { + msgFromSensorC := make(msgFromSensorChan) + msgToSensorC := make(msgToSensorChan) + requestCertificates := func(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { + certRequester := NewCertificateRequester(msgFromSensorC, msgToSensorC) + return certRequester.RequestCertificates(ctx) + } + return &localScannerTLSIssuerImpl{ + msgFromSensorC: msgFromSensorC, + msgToSensorC: msgToSensorC, + certRefresher: newCertRefresher(requestCertificates, certRefreshTimeout, certRefreshBackoff), + } } -type certSecretsRepoImpl struct { - // TODO secretsClient corev1.SecretInterface +type msgFromSensorChan chan *central.MsgFromSensor +type msgToSensorChan chan *central.IssueLocalScannerCertsResponse +type localScannerTLSIssuerImpl struct { + msgFromSensorC msgFromSensorChan + msgToSensorC msgToSensorChan + certRefresher certRefresher } func (i *localScannerTLSIssuerImpl) Start() error { log.Info("starting local scanner TLS issuer.") - - i.certRefreshTicker = concurrency.NewRetryTicker(i.RefreshCertificates, - time.Second, - i.conf.certRefresherBackoff) - certsDescription := "local scanner credentials" - i.certRefreshTicker.OnTickSuccess = func(nextTimeToTick time.Duration) { - log.Infof("successfully refreshed %v", certsDescription) - log.Infof("%v scheduled to be refreshed in %s", certsDescription, nextTimeToTick) - } - i.certRefreshTicker.OnTickError = func(refreshErr error) { - log.Errorf("refreshing %s: %s", certsDescription, refreshErr) - } - i.certRefreshTicker.Start() - + i.certRefresher.Start() log.Info("local scanner TLS issuer started.") return nil } func (i *localScannerTLSIssuerImpl) Stop(err error) { - if i.certRefreshTicker != nil { - i.certRefreshTicker.Stop() - } + i.certRefresher.Stop() log.Info("local scanner TLS issuer stopped.") } -func (i *sensorComponentImpl) Capabilities() []centralsensor.SensorCapability { - return []centralsensor.SensorCapability{} // FIXME +func (i *localScannerTLSIssuerImpl) Capabilities() []centralsensor.SensorCapability { + return []centralsensor.SensorCapability{centralsensor.LocalScannerCredentialsRefresh} } // ResponsesC is called "responses" because for other SensorComponent it is central that // initiates the interaction. However, here it is sensor which sends a request to central. -func (i *sensorComponentImpl) ResponsesC() <-chan *central.MsgFromSensor { +func (i *localScannerTLSIssuerImpl) ResponsesC() <-chan *central.MsgFromSensor { return i.msgFromSensorC } @@ -117,90 +76,4 @@ func (i *localScannerTLSIssuerImpl) ProcessMessage(msg *central.MsgToSensor) err // all returned errors with error level. return nil } -} - -// RefreshCertificates TODO doc -// This is running in the goroutine for a refresh timer in i.certRefresher. -func (i *certIssuerImpl) RefreshCertificates(ctx context.Context) (timeToRefresh time.Duration, err error) { - secrets, fetchErr := i.getSecrets(ctx) - if fetchErr != nil { - return 0, fetchErr - } - timeToRefresh = time.Until(i.getCertRenewalTime(secrets)) - if timeToRefresh > 0 { - return timeToRefresh, nil - } - - certRequester := &certSyncRequesterImpl{ - requestID: uuid.NewV4().String(), - msgFromSensorC: i.msgFromSensorC, - msgToSensorC: i.msgToSensorC, - } - response, requestErr := certRequester.requestCertificates(ctx) - if requestErr != nil { - return 0, requestErr - } - if response.GetError() != nil { - return 0, errors.Errorf("central refused to issue certificates: %s", response.GetError().GetMessage()) - } - - certificates := response.GetCertificates() - if refreshErr := i.updateSecrets(ctx, certificates, secrets); refreshErr != nil { - return 0, refreshErr - } - timeToRefresh = time.Until(i.getCertRenewalTime(secrets)) - return timeToRefresh, nil -} - -func (i *certIssuerImpl) getCertRenewalTime(secrets map[storage.ServiceType]*v1.Secret) time.Time { - return time.Now() // TODO -} - -// updateSecrets stores the certificates in the data of each corresponding secret, and then persists -// the secrets in k8s. -func (i *certIssuerImpl) updateSecrets(ctx context.Context, certificates *storage.TypedServiceCertificateSet, - secrets map[storage.ServiceType]*v1.Secret) error { - // TODO - return i.putSecrets(ctx, secrets) -} - -func (i *certSyncRequesterImpl) requestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { - msg := ¢ral.MsgFromSensor{ - Msg: ¢ral.MsgFromSensor_IssueLocalScannerCertsRequest{ - IssueLocalScannerCertsRequest: ¢ral.IssueLocalScannerCertsRequest{ - RequestId: i.requestID, - }, - }, - } - select { - case <-ctx.Done(): - return nil, ctx.Err() - case i.msgFromSensorC <- msg: - log.Debugf("request to issue local Scanner certificates sent to Central succesfully: %v", msg) - } - - var response *central.IssueLocalScannerCertsResponse - for response == nil { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case newResponse := <-i.msgToSensorC: - if newResponse.GetRequestId() != i.requestID { - log.Debugf("ignoring response with unknown request id %s", response.GetRequestId()) - } else { - response = newResponse - } - } - } - - return response, nil -} - -func (i *certSecretsRepoImpl) getSecrets(ctx context.Context) (map[storage.ServiceType]*v1.Secret, error) { - secretsMap := make(map[storage.ServiceType]*v1.Secret, 3) - return secretsMap, nil // TODO -} - -func (i *certSecretsRepoImpl) putSecrets(ctx context.Context, secrets map[storage.ServiceType]*v1.Secret) error { - return nil // TODO -} +} \ No newline at end of file From 5753ddb3cfd39ea256d253612f9953a31e9caaa5 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 25 Jan 2022 20:11:22 +0100 Subject: [PATCH 13/13] fix style --- sensor/kubernetes/localscanner/cert_refresher.go | 16 ++++++++-------- .../localscanner/certificate_requester.go | 7 +++++-- sensor/kubernetes/localscanner/tls_issuer.go | 14 ++++++++------ 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/sensor/kubernetes/localscanner/cert_refresher.go b/sensor/kubernetes/localscanner/cert_refresher.go index c7a5f4baa7426..4c69e25a47f6a 100644 --- a/sensor/kubernetes/localscanner/cert_refresher.go +++ b/sensor/kubernetes/localscanner/cert_refresher.go @@ -14,9 +14,9 @@ import ( ) var ( - log = logging.LoggerForModule() - certsDescription = "local scanner credentials" - _ certRefresher = (*certRefresherImpl)(nil) + log = logging.LoggerForModule() + certsDescription = "local scanner credentials" + _ certRefresher = (*certRefresherImpl)(nil) ) type certRefresher interface { @@ -28,16 +28,16 @@ func newCertRefresher(requestCertificates requestCertificatesFunc, timeout time. backoff wait.Backoff) certRefresher { return &certRefresherImpl{ requestCertificates: requestCertificates, - certRefreshTimeout: timeout, - certRefreshBackoff: backoff, + certRefreshTimeout: timeout, + certRefreshBackoff: backoff, } } type certRefresherImpl struct { requestCertificates requestCertificatesFunc - certRefreshTimeout time.Duration - certRefreshBackoff wait.Backoff - certRefreshTicker *concurrency.RetryTicker + certRefreshTimeout time.Duration + certRefreshBackoff wait.Backoff + certRefreshTicker *concurrency.RetryTicker certSecretsRepoImpl // FIXME to composition } diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 4fca34482d70f..ab62d7167b771 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -11,13 +11,16 @@ var ( _ CertificateRequester = (*certRequesterSyncImpl)(nil) ) +// CertificateRequester request a new set of local scanner certificates to central. type CertificateRequester interface { RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) } +// NewCertificateRequester creates a new CertificateRequester that communicates through +// the specified channels, and that uses a fresh request ID. func NewCertificateRequester(msgFromSensorC chan *central.MsgFromSensor, msgToSensorC chan *central.IssueLocalScannerCertsResponse) CertificateRequester { - return &certRequesterSyncImpl{ + return &certRequesterSyncImpl{ requestID: uuid.NewV4().String(), msgFromSensorC: msgFromSensorC, msgToSensorC: msgToSensorC, @@ -60,4 +63,4 @@ func (i *certRequesterSyncImpl) RequestCertificates(ctx context.Context) (*centr } return response, nil -} \ No newline at end of file +} diff --git a/sensor/kubernetes/localscanner/tls_issuer.go b/sensor/kubernetes/localscanner/tls_issuer.go index 5961bcd57d252..53b327c71c3ca 100644 --- a/sensor/kubernetes/localscanner/tls_issuer.go +++ b/sensor/kubernetes/localscanner/tls_issuer.go @@ -11,9 +11,11 @@ import ( ) var ( - _ common.SensorComponent = (*localScannerTLSIssuerImpl)(nil) + _ common.SensorComponent = (*localScannerTLSIssuerImpl)(nil) ) +// NewLocalScannerTLSIssuer creates a sensor component that will keep the local scanner certificates +// fresh, using the specified retry parameters. func NewLocalScannerTLSIssuer(certRefreshTimeout time.Duration, certRefreshBackoff wait.Backoff) common.SensorComponent { msgFromSensorC := make(msgFromSensorChan) msgToSensorC := make(msgToSensorChan) @@ -23,17 +25,17 @@ func NewLocalScannerTLSIssuer(certRefreshTimeout time.Duration, certRefreshBacko } return &localScannerTLSIssuerImpl{ msgFromSensorC: msgFromSensorC, - msgToSensorC: msgToSensorC, - certRefresher: newCertRefresher(requestCertificates, certRefreshTimeout, certRefreshBackoff), + msgToSensorC: msgToSensorC, + certRefresher: newCertRefresher(requestCertificates, certRefreshTimeout, certRefreshBackoff), } } type msgFromSensorChan chan *central.MsgFromSensor -type msgToSensorChan chan *central.IssueLocalScannerCertsResponse +type msgToSensorChan chan *central.IssueLocalScannerCertsResponse type localScannerTLSIssuerImpl struct { msgFromSensorC msgFromSensorChan msgToSensorC msgToSensorChan - certRefresher certRefresher + certRefresher certRefresher } func (i *localScannerTLSIssuerImpl) Start() error { @@ -76,4 +78,4 @@ func (i *localScannerTLSIssuerImpl) ProcessMessage(msg *central.MsgToSensor) err // all returned errors with error level. return nil } -} \ No newline at end of file +}