From 3cc04b38f7483b26e09d8bd39c3cddc5cb0d7f3c Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Wed, 26 Jan 2022 12:18:01 +0100 Subject: [PATCH 1/6] Create CertificateRequester to request certificates from central --- .../localscanner/certificate_requester.go | 68 ++++++++++++++ .../certificate_requester_test.go | 93 +++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 sensor/kubernetes/localscanner/certificate_requester.go create mode 100644 sensor/kubernetes/localscanner/certificate_requester_test.go diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go new file mode 100644 index 0000000000000..cf7af59cb819d --- /dev/null +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -0,0 +1,68 @@ +package localscanner + +import ( + "context" + + "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/pkg/logging" + "github.com/stackrox/rox/pkg/uuid" +) + +var ( + _ CertificateRequester = (*certRequesterSyncImpl)(nil) + log = logging.LoggerForModule() +) + +// CertificateRequester request a new set of local scanner certificates to central. +type CertificateRequester interface { + RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) +} + +// NewCertificateRequester creates a new CertificateRequester that communicates through +// the specified channels, and that uses a fresh request ID. +func NewCertificateRequester(msgFromSensorC chan *central.MsgFromSensor, + msgToSensorC chan *central.IssueLocalScannerCertsResponse) CertificateRequester { + return &certRequesterSyncImpl{ + requestID: uuid.NewV4().String(), + msgFromSensorC: msgFromSensorC, + msgToSensorC: msgToSensorC, + } +} + +type certRequesterSyncImpl struct { + requestID string + msgFromSensorC chan *central.MsgFromSensor + msgToSensorC chan *central.IssueLocalScannerCertsResponse +} + +func (i *certRequesterSyncImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { + msg := ¢ral.MsgFromSensor{ + Msg: ¢ral.MsgFromSensor_IssueLocalScannerCertsRequest{ + IssueLocalScannerCertsRequest: ¢ral.IssueLocalScannerCertsRequest{ + RequestId: i.requestID, + }, + }, + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case i.msgFromSensorC <- msg: + log.Debugf("request to issue local Scanner certificates sent to Central succesfully: %v", msg) + } + + var response *central.IssueLocalScannerCertsResponse + for response == nil { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case newResponse := <-i.msgToSensorC: + if newResponse.GetRequestId() != i.requestID { + log.Debugf("ignoring response with unknown request id %q", response.GetRequestId()) + } else { + response = newResponse + } + } + } + + return response, nil +} \ No newline at end of file diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go new file mode 100644 index 0000000000000..deb272286f277 --- /dev/null +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -0,0 +1,93 @@ +package localscanner + +import ( + "context" + "testing" + "time" + + "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/pkg/concurrency" + "github.com/stretchr/testify/suite" +) + +func TestHandler(t *testing.T) { + suite.Run(t, new(certificateRequesterSuite)) +} + +type certificateRequesterSuite struct { + suite.Suite +} + +type fixture struct { + msgFromSensorC chan *central.MsgFromSensor + msgToSensorC chan *central.IssueLocalScannerCertsResponse + requester CertificateRequester +} + +func newFixture() *fixture { + msgFromSensorC := make(chan *central.MsgFromSensor) + msgToSensorC := make(chan *central.IssueLocalScannerCertsResponse) + return &fixture{ + msgFromSensorC: msgFromSensorC, + msgToSensorC: msgToSensorC, + requester: NewCertificateRequester(msgFromSensorC, msgToSensorC), + } +} + +func (s *certificateRequesterSuite) TestRequestCancellation() { + f := newFixture() + requestCtx, cancelRequestCtx := context.WithCancel(context.Background()) + doneErrSig := concurrency.NewErrorSignal() + + go func() { + certs, err := f.requester.RequestCertificates(requestCtx) + s.Nil(certs) + doneErrSig.SignalWithError(err) + }() + cancelRequestCtx() + + waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), time.Second) + defer cancelWaitCtx() + requestErr, ok := doneErrSig.WaitUntil(waitCtx) + s.Require().True(ok) + s.Equal(context.Canceled, requestErr) +} + +func (s *certificateRequesterSuite) TestRequestSuccess() { + f := newFixture() + waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), time.Second) + defer cancelWaitCtx() + doneErrSig := concurrency.NewErrorSignal() + expectedResponseC := make(chan *central.IssueLocalScannerCertsResponse) + + go func() { + response, err := f.requester.RequestCertificates(waitCtx) + expectedResponse := <-expectedResponseC + s.Equal(expectedResponse, response) + s.Nil(err) + doneErrSig.Signal() + }() + + go func() { + select { + case <-waitCtx.Done(): + return + case request := <-f.msgFromSensorC: + s.Require().NotNil(request.GetIssueLocalScannerCertsRequest()) + requestID := request.GetIssueLocalScannerCertsRequest().GetRequestId() + s.Require().NotEmpty(requestID) + // should be ignored. + f.msgToSensorC <-¢ral.IssueLocalScannerCertsResponse{ + RequestId: "", + } + expectedResponse := ¢ral.IssueLocalScannerCertsResponse{ + RequestId: requestID, + } + f.msgToSensorC <-expectedResponse + expectedResponseC <-expectedResponse + } + }() + + _, ok := doneErrSig.WaitUntil(waitCtx) + s.Require().True(ok) +} From ff4b1c2183670dd7aa7b6178491f2452cfba00a3 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Wed, 26 Jan 2022 12:21:12 +0100 Subject: [PATCH 2/6] fix style --- .../kubernetes/localscanner/certificate_requester.go | 6 +++--- .../localscanner/certificate_requester_test.go | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index cf7af59cb819d..623f0d549c4af 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -9,8 +9,8 @@ import ( ) var ( - _ CertificateRequester = (*certRequesterSyncImpl)(nil) - log = logging.LoggerForModule() + _ CertificateRequester = (*certRequesterSyncImpl)(nil) + log = logging.LoggerForModule() ) // CertificateRequester request a new set of local scanner certificates to central. @@ -65,4 +65,4 @@ func (i *certRequesterSyncImpl) RequestCertificates(ctx context.Context) (*centr } return response, nil -} \ No newline at end of file +} diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index deb272286f277..7a40fd7d695a1 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -29,8 +29,8 @@ func newFixture() *fixture { msgToSensorC := make(chan *central.IssueLocalScannerCertsResponse) return &fixture{ msgFromSensorC: msgFromSensorC, - msgToSensorC: msgToSensorC, - requester: NewCertificateRequester(msgFromSensorC, msgToSensorC), + msgToSensorC: msgToSensorC, + requester: NewCertificateRequester(msgFromSensorC, msgToSensorC), } } @@ -77,14 +77,14 @@ func (s *certificateRequesterSuite) TestRequestSuccess() { requestID := request.GetIssueLocalScannerCertsRequest().GetRequestId() s.Require().NotEmpty(requestID) // should be ignored. - f.msgToSensorC <-¢ral.IssueLocalScannerCertsResponse{ + f.msgToSensorC <- ¢ral.IssueLocalScannerCertsResponse{ RequestId: "", } expectedResponse := ¢ral.IssueLocalScannerCertsResponse{ RequestId: requestID, } - f.msgToSensorC <-expectedResponse - expectedResponseC <-expectedResponse + f.msgToSensorC <- expectedResponse + expectedResponseC <- expectedResponse } }() From d65f01f6938920f0fe9d9883a20f889ae9344d6c Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Thu, 27 Jan 2022 12:55:56 +0100 Subject: [PATCH 3/6] support concurrent requests --- .../localscanner/certificate_request.go | 54 +++++++++++++ .../localscanner/certificate_requester.go | 81 ++++++++++--------- .../certificate_requester_test.go | 17 ++-- 3 files changed, 110 insertions(+), 42 deletions(-) create mode 100644 sensor/kubernetes/localscanner/certificate_request.go diff --git a/sensor/kubernetes/localscanner/certificate_request.go b/sensor/kubernetes/localscanner/certificate_request.go new file mode 100644 index 0000000000000..839bef25a864d --- /dev/null +++ b/sensor/kubernetes/localscanner/certificate_request.go @@ -0,0 +1,54 @@ +package localscanner + +import ( + "context" + + "github.com/stackrox/rox/generated/internalapi/central" +) + +var ( + _ certificateRequest = (*certRequestSyncImpl)(nil) +) + +// certificateRequest request a new set of local scanner certificates to central. +type certificateRequest interface { + requestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) +} + +type certRequestSyncImpl struct { + requestID string + msgFromSensorC msgFromSensorC + msgToSensorC msgToSensorC +} + +func (i *certRequestSyncImpl) requestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { + msg := ¢ral.MsgFromSensor{ + Msg: ¢ral.MsgFromSensor_IssueLocalScannerCertsRequest{ + IssueLocalScannerCertsRequest: ¢ral.IssueLocalScannerCertsRequest{ + RequestId: i.requestID, + }, + }, + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case i.msgFromSensorC <- msg: + log.Debugf("request to issue local Scanner certificates sent to Central succesfully: %v", msg) + } + + var response *central.IssueLocalScannerCertsResponse + for response == nil { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case newResponse := <-i.msgToSensorC: + if newResponse.GetRequestId() != i.requestID { + log.Debugf("ignoring response with unknown request id %q", response.GetRequestId()) + } else { + response = newResponse + } + } + } + + return response, nil +} diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 623f0d549c4af..9df30dba898d9 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -2,67 +2,76 @@ package localscanner import ( "context" + "sync" "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/pkg/concurrency" "github.com/stackrox/rox/pkg/logging" "github.com/stackrox/rox/pkg/uuid" ) var ( - _ CertificateRequester = (*certRequesterSyncImpl)(nil) log = logging.LoggerForModule() + _ CertificateRequester = (*certificateRequesterImpl)(nil) ) // CertificateRequester request a new set of local scanner certificates to central. type CertificateRequester interface { + Start() + Stop() RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) } -// NewCertificateRequester creates a new CertificateRequester that communicates through -// the specified channels, and that uses a fresh request ID. -func NewCertificateRequester(msgFromSensorC chan *central.MsgFromSensor, - msgToSensorC chan *central.IssueLocalScannerCertsResponse) CertificateRequester { - return &certRequesterSyncImpl{ - requestID: uuid.NewV4().String(), +// NewCertificateRequester creates a new certificateRequest manager that communicates through +// the specified channels, and that uses a fresh request ID for reach new request. +// TODO document this handles concurrent requests from several goroutines +func NewCertificateRequester(msgFromSensorC msgFromSensorC, msgToSensorC msgToSensorC) CertificateRequester { + return &certificateRequesterImpl{ + stopC: concurrency.NewErrorSignal(), msgFromSensorC: msgFromSensorC, msgToSensorC: msgToSensorC, } } -type certRequesterSyncImpl struct { - requestID string - msgFromSensorC chan *central.MsgFromSensor - msgToSensorC chan *central.IssueLocalScannerCertsResponse +type msgFromSensorC chan *central.MsgFromSensor +type msgToSensorC chan *central.IssueLocalScannerCertsResponse +type certificateRequesterImpl struct { + stopC concurrency.ErrorSignal + msgFromSensorC msgFromSensorC + msgToSensorC msgToSensorC + requests sync.Map } -func (i *certRequesterSyncImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { - msg := ¢ral.MsgFromSensor{ - Msg: ¢ral.MsgFromSensor_IssueLocalScannerCertsRequest{ - IssueLocalScannerCertsRequest: ¢ral.IssueLocalScannerCertsRequest{ - RequestId: i.requestID, - }, - }, - } - select { - case <-ctx.Done(): - return nil, ctx.Err() - case i.msgFromSensorC <- msg: - log.Debugf("request to issue local Scanner certificates sent to Central succesfully: %v", msg) - } +func (m *certificateRequesterImpl) Start() { + go m.forwardMessagesToSensor() +} + +func (m *certificateRequesterImpl) Stop() { + m.stopC.Signal() +} - var response *central.IssueLocalScannerCertsResponse - for response == nil { +func (m *certificateRequesterImpl) forwardMessagesToSensor() { + for { select { - case <-ctx.Done(): - return nil, ctx.Err() - case newResponse := <-i.msgToSensorC: - if newResponse.GetRequestId() != i.requestID { - log.Debugf("ignoring response with unknown request id %q", response.GetRequestId()) - } else { - response = newResponse + case <-m.stopC.Done(): + return + case msg := <-m.msgToSensorC: + requestC, ok := m.requests.Load(msg.GetRequestId()) + if ok { + requestC.(msgToSensorC) <- msg } } } - - return response, nil } + +func (m *certificateRequesterImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { + certRequester := &certRequestSyncImpl{ + requestID: uuid.NewV4().String(), + msgFromSensorC: m.msgFromSensorC, + msgToSensorC: make(msgToSensorC), + } + m.requests.Store(certRequester.requestID, certRequester.msgToSensorC) + response, err := certRequester.requestCertificates(ctx) + m.requests.Delete(certRequester.requestID) + return response, err +} \ No newline at end of file diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index 7a40fd7d695a1..8b11d56d589a0 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/suite" ) -func TestHandler(t *testing.T) { +func TestCertificateRequester(t *testing.T) { suite.Run(t, new(certificateRequesterSuite)) } @@ -19,14 +19,14 @@ type certificateRequesterSuite struct { } type fixture struct { - msgFromSensorC chan *central.MsgFromSensor - msgToSensorC chan *central.IssueLocalScannerCertsResponse - requester CertificateRequester + msgFromSensorC msgFromSensorC + msgToSensorC msgToSensorC + requester CertificateRequester } func newFixture() *fixture { - msgFromSensorC := make(chan *central.MsgFromSensor) - msgToSensorC := make(chan *central.IssueLocalScannerCertsResponse) + msgFromSensorC := make(msgFromSensorC) + msgToSensorC := make(msgToSensorC) return &fixture{ msgFromSensorC: msgFromSensorC, msgToSensorC: msgToSensorC, @@ -36,6 +36,9 @@ func newFixture() *fixture { func (s *certificateRequesterSuite) TestRequestCancellation() { f := newFixture() + f.requester.Start() // FIXME don't start and this is much simpler + defer f.requester.Stop() + requestCtx, cancelRequestCtx := context.WithCancel(context.Background()) doneErrSig := concurrency.NewErrorSignal() @@ -55,6 +58,8 @@ func (s *certificateRequesterSuite) TestRequestCancellation() { func (s *certificateRequesterSuite) TestRequestSuccess() { f := newFixture() + f.requester.Start() + defer f.requester.Stop() waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), time.Second) defer cancelWaitCtx() doneErrSig := concurrency.NewErrorSignal() From b2545f0a38a09c22cd455be68c6d331b551c5e5a Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Thu, 27 Jan 2022 14:57:30 +0100 Subject: [PATCH 4/6] improve comments and logging --- .../localscanner/certificate_request.go | 3 ++- .../localscanner/certificate_requester.go | 27 ++++++++++--------- .../certificate_requester_test.go | 4 +-- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_request.go b/sensor/kubernetes/localscanner/certificate_request.go index 839bef25a864d..abe7c9910ec08 100644 --- a/sensor/kubernetes/localscanner/certificate_request.go +++ b/sensor/kubernetes/localscanner/certificate_request.go @@ -43,7 +43,8 @@ func (i *certRequestSyncImpl) requestCertificates(ctx context.Context) (*central return nil, ctx.Err() case newResponse := <-i.msgToSensorC: if newResponse.GetRequestId() != i.requestID { - log.Debugf("ignoring response with unknown request id %q", response.GetRequestId()) + log.Debugf("request id %q does not match %q, skipping request", response.GetRequestId(), + i.requestID) } else { response = newResponse } diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 9df30dba898d9..8409bad9f49f5 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -22,12 +22,13 @@ type CertificateRequester interface { RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) } -// NewCertificateRequester creates a new certificateRequest manager that communicates through -// the specified channels, and that uses a fresh request ID for reach new request. -// TODO document this handles concurrent requests from several goroutines +// NewCertificateRequester creates a new certificate requester that communicates through +// the specified channels and initializes a new request ID for reach request. +// To use it call Start, and then make requests with RequestCertificates, concurrent requests are supported. +// This assumes that the certificate requester is the only consumer of msgToSensorC. func NewCertificateRequester(msgFromSensorC msgFromSensorC, msgToSensorC msgToSensorC) CertificateRequester { return &certificateRequesterImpl{ - stopC: concurrency.NewErrorSignal(), + stopC: concurrency.NewErrorSignal(), msgFromSensorC: msgFromSensorC, msgToSensorC: msgToSensorC, } @@ -36,10 +37,10 @@ func NewCertificateRequester(msgFromSensorC msgFromSensorC, msgToSensorC msgToSe type msgFromSensorC chan *central.MsgFromSensor type msgToSensorC chan *central.IssueLocalScannerCertsResponse type certificateRequesterImpl struct { - stopC concurrency.ErrorSignal + stopC concurrency.ErrorSignal msgFromSensorC msgFromSensorC msgToSensorC msgToSensorC - requests sync.Map + requests sync.Map } func (m *certificateRequesterImpl) Start() { @@ -65,13 +66,13 @@ func (m *certificateRequesterImpl) forwardMessagesToSensor() { } func (m *certificateRequesterImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { - certRequester := &certRequestSyncImpl{ - requestID: uuid.NewV4().String(), + request := &certRequestSyncImpl{ + requestID: uuid.NewV4().String(), msgFromSensorC: m.msgFromSensorC, - msgToSensorC: make(msgToSensorC), + msgToSensorC: make(msgToSensorC), } - m.requests.Store(certRequester.requestID, certRequester.msgToSensorC) - response, err := certRequester.requestCertificates(ctx) - m.requests.Delete(certRequester.requestID) + m.requests.Store(request.requestID, request.msgToSensorC) + response, err := request.requestCertificates(ctx) + m.requests.Delete(request.requestID) return response, err -} \ No newline at end of file +} diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index 8b11d56d589a0..c77ffc45cc0b6 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -20,8 +20,8 @@ type certificateRequesterSuite struct { type fixture struct { msgFromSensorC msgFromSensorC - msgToSensorC msgToSensorC - requester CertificateRequester + msgToSensorC msgToSensorC + requester CertificateRequester } func newFixture() *fixture { From 710069e5716b9f0456174c45a8130ec3841c96cb Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Thu, 27 Jan 2022 16:15:52 +0100 Subject: [PATCH 5/6] add tests for wrong request ids in response, and concurrent requests --- .../localscanner/certificate_request.go | 2 +- .../localscanner/certificate_requester.go | 3 + .../certificate_requester_test.go | 131 ++++++++++++------ 3 files changed, 91 insertions(+), 45 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_request.go b/sensor/kubernetes/localscanner/certificate_request.go index abe7c9910ec08..445ea73e4beec 100644 --- a/sensor/kubernetes/localscanner/certificate_request.go +++ b/sensor/kubernetes/localscanner/certificate_request.go @@ -33,7 +33,7 @@ func (i *certRequestSyncImpl) requestCertificates(ctx context.Context) (*central case <-ctx.Done(): return nil, ctx.Err() case i.msgFromSensorC <- msg: - log.Debugf("request to issue local Scanner certificates sent to Central succesfully: %v", msg) + log.Debugf("request to issue local Scanner certificates sent to Central successfully: %v", msg) } var response *central.IssueLocalScannerCertsResponse diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 8409bad9f49f5..d8956ea750c27 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -60,6 +60,9 @@ func (m *certificateRequesterImpl) forwardMessagesToSensor() { requestC, ok := m.requests.Load(msg.GetRequestId()) if ok { requestC.(msgToSensorC) <- msg + } else { + log.Debugf("request ID %q does not match any known request ID, skipping request", + msg.GetRequestId()) // FIXME debug } } } diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index c77ffc45cc0b6..42ed590e4d525 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -16,83 +16,126 @@ func TestCertificateRequester(t *testing.T) { type certificateRequesterSuite struct { suite.Suite -} - -type fixture struct { msgFromSensorC msgFromSensorC msgToSensorC msgToSensorC requester CertificateRequester } -func newFixture() *fixture { - msgFromSensorC := make(msgFromSensorC) - msgToSensorC := make(msgToSensorC) - return &fixture{ - msgFromSensorC: msgFromSensorC, - msgToSensorC: msgToSensorC, - requester: NewCertificateRequester(msgFromSensorC, msgToSensorC), - } +func (s *certificateRequesterSuite) SetupTest() { + s.msgFromSensorC = make(msgFromSensorC) + s.msgToSensorC = make(msgToSensorC) + s.requester = NewCertificateRequester(s.msgFromSensorC, s.msgToSensorC) + s.requester.Start() } -func (s *certificateRequesterSuite) TestRequestCancellation() { - f := newFixture() - f.requester.Start() // FIXME don't start and this is much simpler - defer f.requester.Stop() +func (s *certificateRequesterSuite) TearDownTest() { + s.requester.Stop() +} +func (s *certificateRequesterSuite) TestRequestCancellation() { requestCtx, cancelRequestCtx := context.WithCancel(context.Background()) doneErrSig := concurrency.NewErrorSignal() go func() { - certs, err := f.requester.RequestCertificates(requestCtx) + certs, err := s.requester.RequestCertificates(requestCtx) s.Nil(certs) doneErrSig.SignalWithError(err) }() cancelRequestCtx() - waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), time.Second) - defer cancelWaitCtx() - requestErr, ok := doneErrSig.WaitUntil(waitCtx) + requestErr, ok := doneErrSig.WaitWithTimeout(100 * time.Millisecond) s.Require().True(ok) s.Equal(context.Canceled, requestErr) } func (s *certificateRequesterSuite) TestRequestSuccess() { - f := newFixture() - f.requester.Start() - defer f.requester.Stop() - waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), time.Second) + waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancelWaitCtx() - doneErrSig := concurrency.NewErrorSignal() - expectedResponseC := make(chan *central.IssueLocalScannerCertsResponse) + responseC := make(msgToSensorC) + var interceptedRequestID string go func() { - response, err := f.requester.RequestCertificates(waitCtx) - expectedResponse := <-expectedResponseC - s.Equal(expectedResponse, response) - s.Nil(err) - doneErrSig.Signal() + select { + case <-waitCtx.Done(): + return + case request := <-s.msgFromSensorC: + interceptedRequestID = request.GetIssueLocalScannerCertsRequest().GetRequestId() + s.NotEmpty(interceptedRequestID) + s.msgToSensorC <- ¢ral.IssueLocalScannerCertsResponse{ + RequestId: interceptedRequestID, + } + } }() + go func() { + response, err := s.requester.RequestCertificates(waitCtx) + s.NoError(err) + responseC <- response + }() + + select { + case response := <-responseC: + s.Equal(interceptedRequestID, response.GetRequestId()) + case <-waitCtx.Done(): + s.Require().Fail("timeout reached") + } +} + +func (s *certificateRequesterSuite) TestResponsesWithUnknownIDAreIgnored() { + waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancelWaitCtx() + doneErrSig := concurrency.NewErrorSignal() + go func() { select { case <-waitCtx.Done(): - return - case request := <-f.msgFromSensorC: - s.Require().NotNil(request.GetIssueLocalScannerCertsRequest()) - requestID := request.GetIssueLocalScannerCertsRequest().GetRequestId() - s.Require().NotEmpty(requestID) - // should be ignored. - f.msgToSensorC <- ¢ral.IssueLocalScannerCertsResponse{ - RequestId: "", - } - expectedResponse := ¢ral.IssueLocalScannerCertsResponse{ - RequestId: requestID, + case <-s.msgFromSensorC: + select { + case <-waitCtx.Done(): + // Request with different request ID should be ignored. + case s.msgToSensorC <- ¢ral.IssueLocalScannerCertsResponse{RequestId: ""}: } - f.msgToSensorC <- expectedResponse - expectedResponseC <- expectedResponse } }() - _, ok := doneErrSig.WaitUntil(waitCtx) + go func() { + certs, err := s.requester.RequestCertificates(waitCtx) + s.Nil(certs) + doneErrSig.SignalWithError(err) + }() + + requestErr, ok := doneErrSig.WaitWithTimeout(100 * time.Millisecond) + s.Require().True(ok) + s.Equal(context.DeadlineExceeded, requestErr) +} + +func (s *certificateRequesterSuite) TestRequestConcurrentRequestDoNotInterfere() { + waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancelWaitCtx() + numConcurrentRequests := 3 + waitGroup := concurrency.NewWaitGroup(numConcurrentRequests) + + for i := 0; i < numConcurrentRequests; i++ { + go func() { + select { + case <-waitCtx.Done(): + return + case request := <-s.msgFromSensorC: + interceptedRequestID := request.GetIssueLocalScannerCertsRequest().GetRequestId() + s.NotEmpty(interceptedRequestID) + s.msgToSensorC <- ¢ral.IssueLocalScannerCertsResponse{ + RequestId: interceptedRequestID, + } + } + }() + + go func() { + _, err := s.requester.RequestCertificates(waitCtx) + s.NoError(err) + waitGroup.Add(-1) + }() + } + + ok := concurrency.WaitWithTimeout(&waitGroup, 100*time.Millisecond) s.Require().True(ok) } From 573bd2887c55bdf5b8d3c141d28f6c1a189384a5 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Thu, 27 Jan 2022 16:35:13 +0100 Subject: [PATCH 6/6] fix style --- pkg/sync/common_aliases.go | 3 +++ sensor/kubernetes/localscanner/certificate_requester.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/sync/common_aliases.go b/pkg/sync/common_aliases.go index 41ef90e6f8d98..44305380a8902 100644 --- a/pkg/sync/common_aliases.go +++ b/pkg/sync/common_aliases.go @@ -10,3 +10,6 @@ type WaitGroup = sync.WaitGroup // Locker is an alias for `sync.Locker`. type Locker = sync.Locker + +// Map is an alias for `sync.Map`. +type Map = sync.Map diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index d8956ea750c27..6e786b4dac904 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -2,11 +2,11 @@ package localscanner import ( "context" - "sync" "github.com/stackrox/rox/generated/internalapi/central" "github.com/stackrox/rox/pkg/concurrency" "github.com/stackrox/rox/pkg/logging" + "github.com/stackrox/rox/pkg/sync" "github.com/stackrox/rox/pkg/uuid" )