-
Notifications
You must be signed in to change notification settings - Fork 171
ROX-9014: Draft TLSIssuer as a SensorComponent and CertificateIssuer #400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
cde681b
Draft TLSIssuer as a sensor component and certificate source
67fe75e
Fix style
ece084a
simplify TLS issuer
ac3d87b
remove unused code
a10b805
fix typo bug and minor cleanup
9ca0366
remove context param of Start method in CertRefresher
fe511bb
pass context to all subcalls in RefreshCertificates
dac9bb6
rename requestsC and responsesC to msgFromSensorC and msgToSensorC
32f8cd1
improve comments and logging
44d7d14
improve naming of repo operation vs secret refresh
cfd3652
use RetryTicker
24024cd
refactoring WIP
5753ddb
fix style
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| package localscanner | ||
|
|
||
| import ( | ||
| "context" | ||
| "time" | ||
|
|
||
| "github.com/pkg/errors" | ||
| "github.com/stackrox/rox/generated/internalapi/central" | ||
| "github.com/stackrox/rox/generated/storage" | ||
| "github.com/stackrox/rox/pkg/concurrency" | ||
| "github.com/stackrox/rox/pkg/logging" | ||
| v1 "k8s.io/api/core/v1" | ||
| "k8s.io/apimachinery/pkg/util/wait" | ||
| ) | ||
|
|
||
| var ( | ||
| log = logging.LoggerForModule() | ||
| certsDescription = "local scanner credentials" | ||
| _ certRefresher = (*certRefresherImpl)(nil) | ||
| ) | ||
|
|
||
| type certRefresher interface { | ||
| Start() | ||
| Stop() | ||
| } | ||
|
|
||
| func newCertRefresher(requestCertificates requestCertificatesFunc, timeout time.Duration, | ||
| backoff wait.Backoff) certRefresher { | ||
| return &certRefresherImpl{ | ||
| requestCertificates: requestCertificates, | ||
| certRefreshTimeout: timeout, | ||
| certRefreshBackoff: backoff, | ||
| } | ||
| } | ||
|
|
||
| type certRefresherImpl struct { | ||
| requestCertificates requestCertificatesFunc | ||
| certRefreshTimeout time.Duration | ||
| certRefreshBackoff wait.Backoff | ||
| certRefreshTicker *concurrency.RetryTicker | ||
| certSecretsRepoImpl // FIXME to composition | ||
| } | ||
|
|
||
| type requestCertificatesFunc func(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) | ||
|
|
||
| func (i *certRefresherImpl) Start() { | ||
| i.certRefreshTicker = concurrency.NewRetryTicker(i.RefreshCertificates, i.certRefreshTimeout, i.certRefreshBackoff) | ||
| i.certRefreshTicker.OnTickSuccess = i.logRefreshSuccess | ||
| i.certRefreshTicker.OnTickError = i.logRefreshError | ||
| i.certRefreshTicker.Start() | ||
| } | ||
|
|
||
| func (i *certRefresherImpl) Stop() { | ||
| if i.certRefreshTicker != nil { | ||
| i.certRefreshTicker.Stop() | ||
| } | ||
| } | ||
|
|
||
| // RefreshCertificates determines refreshes the certificate secrets if needed, and returns the time | ||
| // until the next refresh. | ||
| // This is running in the goroutine for a refresh timer in i.certRefresher. | ||
| func (i *certRefresherImpl) RefreshCertificates(ctx context.Context) (timeToRefresh time.Duration, err error) { | ||
| secrets, fetchErr := i.getSecrets(ctx) | ||
| if fetchErr != nil { | ||
| return 0, fetchErr | ||
| } | ||
| timeToRefresh = time.Until(i.getCertRenewalTime(secrets)) | ||
| if timeToRefresh > 0 { | ||
| return timeToRefresh, nil | ||
| } | ||
|
|
||
| response, requestErr := i.requestCertificates(ctx) | ||
| if requestErr != nil { | ||
| return 0, requestErr | ||
| } | ||
| if response.GetError() != nil { | ||
| return 0, errors.Errorf("central refused to issue certificates: %s", response.GetError().GetMessage()) | ||
| } | ||
|
|
||
| certificates := response.GetCertificates() | ||
| if refreshErr := i.updateSecrets(ctx, certificates, secrets); refreshErr != nil { | ||
| return 0, refreshErr | ||
| } | ||
| timeToRefresh = time.Until(i.getCertRenewalTime(secrets)) | ||
| return timeToRefresh, nil | ||
| } | ||
|
|
||
| func (i *certRefresherImpl) logRefreshSuccess(nextTimeToTick time.Duration) { | ||
| log.Infof("successfully refreshed %v", certsDescription) | ||
| log.Infof("%v scheduled to be refreshed in %s", certsDescription, nextTimeToTick) | ||
| } | ||
|
|
||
| func (i *certRefresherImpl) logRefreshError(refreshErr error) { | ||
| log.Errorf("refreshing %s: %s", certsDescription, refreshErr) | ||
| } | ||
|
|
||
| func (i *certRefresherImpl) getCertRenewalTime(secrets map[storage.ServiceType]*v1.Secret) time.Time { | ||
| return time.Now() // TODO | ||
| } | ||
|
|
||
| // updateSecrets stores the certificates in the data of each corresponding secret, and then persists | ||
| // the secrets in k8s. | ||
| func (i *certRefresherImpl) updateSecrets(ctx context.Context, certificates *storage.TypedServiceCertificateSet, | ||
| secrets map[storage.ServiceType]*v1.Secret) error { | ||
| // TODO | ||
| return i.putSecrets(ctx, secrets) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| package localscanner | ||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| "github.com/stackrox/rox/generated/internalapi/central" | ||
| "github.com/stackrox/rox/pkg/uuid" | ||
| ) | ||
|
|
||
| var ( | ||
| _ CertificateRequester = (*certRequesterSyncImpl)(nil) | ||
| ) | ||
|
|
||
| // CertificateRequester request a new set of local scanner certificates to central. | ||
| type CertificateRequester interface { | ||
| RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) | ||
| } | ||
|
|
||
| // NewCertificateRequester creates a new CertificateRequester that communicates through | ||
| // the specified channels, and that uses a fresh request ID. | ||
| func NewCertificateRequester(msgFromSensorC chan *central.MsgFromSensor, | ||
| msgToSensorC chan *central.IssueLocalScannerCertsResponse) CertificateRequester { | ||
| return &certRequesterSyncImpl{ | ||
| requestID: uuid.NewV4().String(), | ||
| msgFromSensorC: msgFromSensorC, | ||
| msgToSensorC: msgToSensorC, | ||
| } | ||
| } | ||
|
|
||
| type certRequesterSyncImpl struct { | ||
| requestID string | ||
| msgFromSensorC chan *central.MsgFromSensor | ||
| msgToSensorC chan *central.IssueLocalScannerCertsResponse | ||
| } | ||
|
|
||
| func (i *certRequesterSyncImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { | ||
| msg := ¢ral.MsgFromSensor{ | ||
| Msg: ¢ral.MsgFromSensor_IssueLocalScannerCertsRequest{ | ||
| IssueLocalScannerCertsRequest: ¢ral.IssueLocalScannerCertsRequest{ | ||
| RequestId: i.requestID, | ||
| }, | ||
| }, | ||
| } | ||
| select { | ||
| case <-ctx.Done(): | ||
| return nil, ctx.Err() | ||
| case i.msgFromSensorC <- msg: | ||
| log.Debugf("request to issue local Scanner certificates sent to Central succesfully: %v", msg) | ||
| } | ||
|
|
||
| var response *central.IssueLocalScannerCertsResponse | ||
| for response == nil { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return nil, ctx.Err() | ||
| case newResponse := <-i.msgToSensorC: | ||
| if newResponse.GetRequestId() != i.requestID { | ||
| log.Debugf("ignoring response with unknown request id %s", response.GetRequestId()) | ||
| } else { | ||
| response = newResponse | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return response, nil | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| package localscanner | ||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| "github.com/stackrox/rox/generated/storage" | ||
| v1 "k8s.io/api/core/v1" | ||
| ) | ||
|
|
||
| var ( | ||
| _ certSecretsRepo = (*certSecretsRepoImpl)(nil) | ||
| ) | ||
|
|
||
| type certSecretsRepo interface { | ||
| getSecrets(ctx context.Context) (map[storage.ServiceType]*v1.Secret, error) | ||
| putSecrets(ctx context.Context, secrets map[storage.ServiceType]*v1.Secret) error | ||
| } | ||
|
|
||
| type certSecretsRepoImpl struct { | ||
| // TODO secretsClient corev1.SecretInterface | ||
| } | ||
|
|
||
| func (i *certSecretsRepoImpl) getSecrets(ctx context.Context) (map[storage.ServiceType]*v1.Secret, error) { | ||
| secretsMap := make(map[storage.ServiceType]*v1.Secret, 3) | ||
| return secretsMap, nil // TODO | ||
| } | ||
|
|
||
| func (i *certSecretsRepoImpl) putSecrets(ctx context.Context, secrets map[storage.ServiceType]*v1.Secret) error { | ||
| return nil // TODO | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| package localscanner | ||
|
|
||
| import ( | ||
| "context" | ||
| "time" | ||
|
|
||
| "github.com/stackrox/rox/generated/internalapi/central" | ||
| "github.com/stackrox/rox/pkg/centralsensor" | ||
| "github.com/stackrox/rox/sensor/common" | ||
| "k8s.io/apimachinery/pkg/util/wait" | ||
| ) | ||
|
|
||
| var ( | ||
| _ common.SensorComponent = (*localScannerTLSIssuerImpl)(nil) | ||
| ) | ||
|
|
||
| // NewLocalScannerTLSIssuer creates a sensor component that will keep the local scanner certificates | ||
| // fresh, using the specified retry parameters. | ||
| func NewLocalScannerTLSIssuer(certRefreshTimeout time.Duration, certRefreshBackoff wait.Backoff) common.SensorComponent { | ||
| msgFromSensorC := make(msgFromSensorChan) | ||
| msgToSensorC := make(msgToSensorChan) | ||
| requestCertificates := func(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { | ||
| certRequester := NewCertificateRequester(msgFromSensorC, msgToSensorC) | ||
| return certRequester.RequestCertificates(ctx) | ||
| } | ||
| return &localScannerTLSIssuerImpl{ | ||
| msgFromSensorC: msgFromSensorC, | ||
| msgToSensorC: msgToSensorC, | ||
| certRefresher: newCertRefresher(requestCertificates, certRefreshTimeout, certRefreshBackoff), | ||
| } | ||
| } | ||
|
|
||
| type msgFromSensorChan chan *central.MsgFromSensor | ||
| type msgToSensorChan chan *central.IssueLocalScannerCertsResponse | ||
| type localScannerTLSIssuerImpl struct { | ||
| msgFromSensorC msgFromSensorChan | ||
| msgToSensorC msgToSensorChan | ||
| certRefresher certRefresher | ||
| } | ||
|
|
||
| func (i *localScannerTLSIssuerImpl) Start() error { | ||
| log.Info("starting local scanner TLS issuer.") | ||
| i.certRefresher.Start() | ||
| log.Info("local scanner TLS issuer started.") | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (i *localScannerTLSIssuerImpl) Stop(err error) { | ||
| i.certRefresher.Stop() | ||
| log.Info("local scanner TLS issuer stopped.") | ||
| } | ||
|
|
||
| func (i *localScannerTLSIssuerImpl) Capabilities() []centralsensor.SensorCapability { | ||
| return []centralsensor.SensorCapability{centralsensor.LocalScannerCredentialsRefresh} | ||
| } | ||
|
|
||
| // ResponsesC is called "responses" because for other SensorComponent it is central that | ||
| // initiates the interaction. However, here it is sensor which sends a request to central. | ||
| func (i *localScannerTLSIssuerImpl) ResponsesC() <-chan *central.MsgFromSensor { | ||
| return i.msgFromSensorC | ||
| } | ||
|
|
||
| // ProcessMessage is how the central receiver delivers messages from central to SensorComponents. | ||
| // This method must not block as it would prevent centralReceiverImpl from sending messages | ||
| // to other SensorComponents. | ||
| func (i *localScannerTLSIssuerImpl) ProcessMessage(msg *central.MsgToSensor) error { | ||
| switch m := msg.GetMsg().(type) { | ||
| case *central.MsgToSensor_IssueLocalScannerCertsResponse: | ||
| response := m.IssueLocalScannerCertsResponse | ||
| go func() { | ||
| // will block if i.resultC is filled. | ||
| i.msgToSensorC <- response | ||
| }() | ||
| return nil | ||
| default: | ||
| // silently ignore other messages broadcasted by centralReceiverImpl, as centralReceiverImpl logs | ||
| // all returned errors with error level. | ||
| return nil | ||
| } | ||
| } |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be relatively simple to create fake k8s clients if you want to use it for testing.