From ccd298a78ac70ca9b6703c24e893a78672c86800 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Wed, 26 Jan 2022 12:18:01 +0100 Subject: [PATCH 01/29] Create CertificateRequester to request certificates from central --- .../localscanner/certificate_requester.go | 68 ++++++++++++++ .../certificate_requester_test.go | 93 +++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 sensor/kubernetes/localscanner/certificate_requester.go create mode 100644 sensor/kubernetes/localscanner/certificate_requester_test.go diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go new file mode 100644 index 0000000000000..cf7af59cb819d --- /dev/null +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -0,0 +1,68 @@ +package localscanner + +import ( + "context" + + "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/pkg/logging" + "github.com/stackrox/rox/pkg/uuid" +) + +var ( + _ CertificateRequester = (*certRequesterSyncImpl)(nil) + log = logging.LoggerForModule() +) + +// CertificateRequester request a new set of local scanner certificates to central. +type CertificateRequester interface { + RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) +} + +// NewCertificateRequester creates a new CertificateRequester that communicates through +// the specified channels, and that uses a fresh request ID. +func NewCertificateRequester(msgFromSensorC chan *central.MsgFromSensor, + msgToSensorC chan *central.IssueLocalScannerCertsResponse) CertificateRequester { + return &certRequesterSyncImpl{ + requestID: uuid.NewV4().String(), + msgFromSensorC: msgFromSensorC, + msgToSensorC: msgToSensorC, + } +} + +type certRequesterSyncImpl struct { + requestID string + msgFromSensorC chan *central.MsgFromSensor + msgToSensorC chan *central.IssueLocalScannerCertsResponse +} + +func (i *certRequesterSyncImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { + msg := ¢ral.MsgFromSensor{ + Msg: ¢ral.MsgFromSensor_IssueLocalScannerCertsRequest{ + IssueLocalScannerCertsRequest: ¢ral.IssueLocalScannerCertsRequest{ + RequestId: i.requestID, + }, + }, + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case i.msgFromSensorC <- msg: + log.Debugf("request to issue local Scanner certificates sent to Central succesfully: %v", msg) + } + + var response *central.IssueLocalScannerCertsResponse + for response == nil { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case newResponse := <-i.msgToSensorC: + if newResponse.GetRequestId() != i.requestID { + log.Debugf("ignoring response with unknown request id %q", response.GetRequestId()) + } else { + response = newResponse + } + } + } + + return response, nil +} \ No newline at end of file diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go new file mode 100644 index 0000000000000..deb272286f277 --- /dev/null +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -0,0 +1,93 @@ +package localscanner + +import ( + "context" + "testing" + "time" + + "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/pkg/concurrency" + "github.com/stretchr/testify/suite" +) + +func TestHandler(t *testing.T) { + suite.Run(t, new(certificateRequesterSuite)) +} + +type certificateRequesterSuite struct { + suite.Suite +} + +type fixture struct { + msgFromSensorC chan *central.MsgFromSensor + msgToSensorC chan *central.IssueLocalScannerCertsResponse + requester CertificateRequester +} + +func newFixture() *fixture { + msgFromSensorC := make(chan *central.MsgFromSensor) + msgToSensorC := make(chan *central.IssueLocalScannerCertsResponse) + return &fixture{ + msgFromSensorC: msgFromSensorC, + msgToSensorC: msgToSensorC, + requester: NewCertificateRequester(msgFromSensorC, msgToSensorC), + } +} + +func (s *certificateRequesterSuite) TestRequestCancellation() { + f := newFixture() + requestCtx, cancelRequestCtx := context.WithCancel(context.Background()) + doneErrSig := concurrency.NewErrorSignal() + + go func() { + certs, err := f.requester.RequestCertificates(requestCtx) + s.Nil(certs) + doneErrSig.SignalWithError(err) + }() + cancelRequestCtx() + + waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), time.Second) + defer cancelWaitCtx() + requestErr, ok := doneErrSig.WaitUntil(waitCtx) + s.Require().True(ok) + s.Equal(context.Canceled, requestErr) +} + +func (s *certificateRequesterSuite) TestRequestSuccess() { + f := newFixture() + waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), time.Second) + defer cancelWaitCtx() + doneErrSig := concurrency.NewErrorSignal() + expectedResponseC := make(chan *central.IssueLocalScannerCertsResponse) + + go func() { + response, err := f.requester.RequestCertificates(waitCtx) + expectedResponse := <-expectedResponseC + s.Equal(expectedResponse, response) + s.Nil(err) + doneErrSig.Signal() + }() + + go func() { + select { + case <-waitCtx.Done(): + return + case request := <-f.msgFromSensorC: + s.Require().NotNil(request.GetIssueLocalScannerCertsRequest()) + requestID := request.GetIssueLocalScannerCertsRequest().GetRequestId() + s.Require().NotEmpty(requestID) + // should be ignored. + f.msgToSensorC <-¢ral.IssueLocalScannerCertsResponse{ + RequestId: "", + } + expectedResponse := ¢ral.IssueLocalScannerCertsResponse{ + RequestId: requestID, + } + f.msgToSensorC <-expectedResponse + expectedResponseC <-expectedResponse + } + }() + + _, ok := doneErrSig.WaitUntil(waitCtx) + s.Require().True(ok) +} From 10cd58caac37eeeea245b4ac4dc2937e799bd3fa Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Wed, 26 Jan 2022 12:21:12 +0100 Subject: [PATCH 02/29] fix style --- .../kubernetes/localscanner/certificate_requester.go | 6 +++--- .../localscanner/certificate_requester_test.go | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index cf7af59cb819d..623f0d549c4af 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -9,8 +9,8 @@ import ( ) var ( - _ CertificateRequester = (*certRequesterSyncImpl)(nil) - log = logging.LoggerForModule() + _ CertificateRequester = (*certRequesterSyncImpl)(nil) + log = logging.LoggerForModule() ) // CertificateRequester request a new set of local scanner certificates to central. @@ -65,4 +65,4 @@ func (i *certRequesterSyncImpl) RequestCertificates(ctx context.Context) (*centr } return response, nil -} \ No newline at end of file +} diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index deb272286f277..7a40fd7d695a1 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -29,8 +29,8 @@ func newFixture() *fixture { msgToSensorC := make(chan *central.IssueLocalScannerCertsResponse) return &fixture{ msgFromSensorC: msgFromSensorC, - msgToSensorC: msgToSensorC, - requester: NewCertificateRequester(msgFromSensorC, msgToSensorC), + msgToSensorC: msgToSensorC, + requester: NewCertificateRequester(msgFromSensorC, msgToSensorC), } } @@ -77,14 +77,14 @@ func (s *certificateRequesterSuite) TestRequestSuccess() { requestID := request.GetIssueLocalScannerCertsRequest().GetRequestId() s.Require().NotEmpty(requestID) // should be ignored. - f.msgToSensorC <-¢ral.IssueLocalScannerCertsResponse{ + f.msgToSensorC <- ¢ral.IssueLocalScannerCertsResponse{ RequestId: "", } expectedResponse := ¢ral.IssueLocalScannerCertsResponse{ RequestId: requestID, } - f.msgToSensorC <-expectedResponse - expectedResponseC <-expectedResponse + f.msgToSensorC <- expectedResponse + expectedResponseC <- expectedResponse } }() From 308ca2f3d7ef8343b23538d27447a1b2704a0b1d Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Thu, 27 Jan 2022 12:55:56 +0100 Subject: [PATCH 03/29] support concurrent requests --- .../localscanner/certificate_request.go | 54 +++++++++++++ .../localscanner/certificate_requester.go | 81 ++++++++++--------- .../certificate_requester_test.go | 17 ++-- 3 files changed, 110 insertions(+), 42 deletions(-) create mode 100644 sensor/kubernetes/localscanner/certificate_request.go diff --git a/sensor/kubernetes/localscanner/certificate_request.go b/sensor/kubernetes/localscanner/certificate_request.go new file mode 100644 index 0000000000000..839bef25a864d --- /dev/null +++ b/sensor/kubernetes/localscanner/certificate_request.go @@ -0,0 +1,54 @@ +package localscanner + +import ( + "context" + + "github.com/stackrox/rox/generated/internalapi/central" +) + +var ( + _ certificateRequest = (*certRequestSyncImpl)(nil) +) + +// certificateRequest request a new set of local scanner certificates to central. +type certificateRequest interface { + requestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) +} + +type certRequestSyncImpl struct { + requestID string + msgFromSensorC msgFromSensorC + msgToSensorC msgToSensorC +} + +func (i *certRequestSyncImpl) requestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { + msg := ¢ral.MsgFromSensor{ + Msg: ¢ral.MsgFromSensor_IssueLocalScannerCertsRequest{ + IssueLocalScannerCertsRequest: ¢ral.IssueLocalScannerCertsRequest{ + RequestId: i.requestID, + }, + }, + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case i.msgFromSensorC <- msg: + log.Debugf("request to issue local Scanner certificates sent to Central succesfully: %v", msg) + } + + var response *central.IssueLocalScannerCertsResponse + for response == nil { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case newResponse := <-i.msgToSensorC: + if newResponse.GetRequestId() != i.requestID { + log.Debugf("ignoring response with unknown request id %q", response.GetRequestId()) + } else { + response = newResponse + } + } + } + + return response, nil +} diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 623f0d549c4af..9df30dba898d9 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -2,67 +2,76 @@ package localscanner import ( "context" + "sync" "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/pkg/concurrency" "github.com/stackrox/rox/pkg/logging" "github.com/stackrox/rox/pkg/uuid" ) var ( - _ CertificateRequester = (*certRequesterSyncImpl)(nil) log = logging.LoggerForModule() + _ CertificateRequester = (*certificateRequesterImpl)(nil) ) // CertificateRequester request a new set of local scanner certificates to central. type CertificateRequester interface { + Start() + Stop() RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) } -// NewCertificateRequester creates a new CertificateRequester that communicates through -// the specified channels, and that uses a fresh request ID. -func NewCertificateRequester(msgFromSensorC chan *central.MsgFromSensor, - msgToSensorC chan *central.IssueLocalScannerCertsResponse) CertificateRequester { - return &certRequesterSyncImpl{ - requestID: uuid.NewV4().String(), +// NewCertificateRequester creates a new certificateRequest manager that communicates through +// the specified channels, and that uses a fresh request ID for reach new request. +// TODO document this handles concurrent requests from several goroutines +func NewCertificateRequester(msgFromSensorC msgFromSensorC, msgToSensorC msgToSensorC) CertificateRequester { + return &certificateRequesterImpl{ + stopC: concurrency.NewErrorSignal(), msgFromSensorC: msgFromSensorC, msgToSensorC: msgToSensorC, } } -type certRequesterSyncImpl struct { - requestID string - msgFromSensorC chan *central.MsgFromSensor - msgToSensorC chan *central.IssueLocalScannerCertsResponse +type msgFromSensorC chan *central.MsgFromSensor +type msgToSensorC chan *central.IssueLocalScannerCertsResponse +type certificateRequesterImpl struct { + stopC concurrency.ErrorSignal + msgFromSensorC msgFromSensorC + msgToSensorC msgToSensorC + requests sync.Map } -func (i *certRequesterSyncImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { - msg := ¢ral.MsgFromSensor{ - Msg: ¢ral.MsgFromSensor_IssueLocalScannerCertsRequest{ - IssueLocalScannerCertsRequest: ¢ral.IssueLocalScannerCertsRequest{ - RequestId: i.requestID, - }, - }, - } - select { - case <-ctx.Done(): - return nil, ctx.Err() - case i.msgFromSensorC <- msg: - log.Debugf("request to issue local Scanner certificates sent to Central succesfully: %v", msg) - } +func (m *certificateRequesterImpl) Start() { + go m.forwardMessagesToSensor() +} + +func (m *certificateRequesterImpl) Stop() { + m.stopC.Signal() +} - var response *central.IssueLocalScannerCertsResponse - for response == nil { +func (m *certificateRequesterImpl) forwardMessagesToSensor() { + for { select { - case <-ctx.Done(): - return nil, ctx.Err() - case newResponse := <-i.msgToSensorC: - if newResponse.GetRequestId() != i.requestID { - log.Debugf("ignoring response with unknown request id %q", response.GetRequestId()) - } else { - response = newResponse + case <-m.stopC.Done(): + return + case msg := <-m.msgToSensorC: + requestC, ok := m.requests.Load(msg.GetRequestId()) + if ok { + requestC.(msgToSensorC) <- msg } } } - - return response, nil } + +func (m *certificateRequesterImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { + certRequester := &certRequestSyncImpl{ + requestID: uuid.NewV4().String(), + msgFromSensorC: m.msgFromSensorC, + msgToSensorC: make(msgToSensorC), + } + m.requests.Store(certRequester.requestID, certRequester.msgToSensorC) + response, err := certRequester.requestCertificates(ctx) + m.requests.Delete(certRequester.requestID) + return response, err +} \ No newline at end of file diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index 7a40fd7d695a1..8b11d56d589a0 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/suite" ) -func TestHandler(t *testing.T) { +func TestCertificateRequester(t *testing.T) { suite.Run(t, new(certificateRequesterSuite)) } @@ -19,14 +19,14 @@ type certificateRequesterSuite struct { } type fixture struct { - msgFromSensorC chan *central.MsgFromSensor - msgToSensorC chan *central.IssueLocalScannerCertsResponse - requester CertificateRequester + msgFromSensorC msgFromSensorC + msgToSensorC msgToSensorC + requester CertificateRequester } func newFixture() *fixture { - msgFromSensorC := make(chan *central.MsgFromSensor) - msgToSensorC := make(chan *central.IssueLocalScannerCertsResponse) + msgFromSensorC := make(msgFromSensorC) + msgToSensorC := make(msgToSensorC) return &fixture{ msgFromSensorC: msgFromSensorC, msgToSensorC: msgToSensorC, @@ -36,6 +36,9 @@ func newFixture() *fixture { func (s *certificateRequesterSuite) TestRequestCancellation() { f := newFixture() + f.requester.Start() // FIXME don't start and this is much simpler + defer f.requester.Stop() + requestCtx, cancelRequestCtx := context.WithCancel(context.Background()) doneErrSig := concurrency.NewErrorSignal() @@ -55,6 +58,8 @@ func (s *certificateRequesterSuite) TestRequestCancellation() { func (s *certificateRequesterSuite) TestRequestSuccess() { f := newFixture() + f.requester.Start() + defer f.requester.Stop() waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), time.Second) defer cancelWaitCtx() doneErrSig := concurrency.NewErrorSignal() From 0b561e446136bc49e517e0dcbc7e39f4ac8dc433 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Thu, 27 Jan 2022 14:57:30 +0100 Subject: [PATCH 04/29] improve comments and logging --- .../localscanner/certificate_request.go | 3 ++- .../localscanner/certificate_requester.go | 27 ++++++++++--------- .../certificate_requester_test.go | 4 +-- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_request.go b/sensor/kubernetes/localscanner/certificate_request.go index 839bef25a864d..abe7c9910ec08 100644 --- a/sensor/kubernetes/localscanner/certificate_request.go +++ b/sensor/kubernetes/localscanner/certificate_request.go @@ -43,7 +43,8 @@ func (i *certRequestSyncImpl) requestCertificates(ctx context.Context) (*central return nil, ctx.Err() case newResponse := <-i.msgToSensorC: if newResponse.GetRequestId() != i.requestID { - log.Debugf("ignoring response with unknown request id %q", response.GetRequestId()) + log.Debugf("request id %q does not match %q, skipping request", response.GetRequestId(), + i.requestID) } else { response = newResponse } diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 9df30dba898d9..8409bad9f49f5 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -22,12 +22,13 @@ type CertificateRequester interface { RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) } -// NewCertificateRequester creates a new certificateRequest manager that communicates through -// the specified channels, and that uses a fresh request ID for reach new request. -// TODO document this handles concurrent requests from several goroutines +// NewCertificateRequester creates a new certificate requester that communicates through +// the specified channels and initializes a new request ID for reach request. +// To use it call Start, and then make requests with RequestCertificates, concurrent requests are supported. +// This assumes that the certificate requester is the only consumer of msgToSensorC. func NewCertificateRequester(msgFromSensorC msgFromSensorC, msgToSensorC msgToSensorC) CertificateRequester { return &certificateRequesterImpl{ - stopC: concurrency.NewErrorSignal(), + stopC: concurrency.NewErrorSignal(), msgFromSensorC: msgFromSensorC, msgToSensorC: msgToSensorC, } @@ -36,10 +37,10 @@ func NewCertificateRequester(msgFromSensorC msgFromSensorC, msgToSensorC msgToSe type msgFromSensorC chan *central.MsgFromSensor type msgToSensorC chan *central.IssueLocalScannerCertsResponse type certificateRequesterImpl struct { - stopC concurrency.ErrorSignal + stopC concurrency.ErrorSignal msgFromSensorC msgFromSensorC msgToSensorC msgToSensorC - requests sync.Map + requests sync.Map } func (m *certificateRequesterImpl) Start() { @@ -65,13 +66,13 @@ func (m *certificateRequesterImpl) forwardMessagesToSensor() { } func (m *certificateRequesterImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { - certRequester := &certRequestSyncImpl{ - requestID: uuid.NewV4().String(), + request := &certRequestSyncImpl{ + requestID: uuid.NewV4().String(), msgFromSensorC: m.msgFromSensorC, - msgToSensorC: make(msgToSensorC), + msgToSensorC: make(msgToSensorC), } - m.requests.Store(certRequester.requestID, certRequester.msgToSensorC) - response, err := certRequester.requestCertificates(ctx) - m.requests.Delete(certRequester.requestID) + m.requests.Store(request.requestID, request.msgToSensorC) + response, err := request.requestCertificates(ctx) + m.requests.Delete(request.requestID) return response, err -} \ No newline at end of file +} diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index 8b11d56d589a0..c77ffc45cc0b6 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -20,8 +20,8 @@ type certificateRequesterSuite struct { type fixture struct { msgFromSensorC msgFromSensorC - msgToSensorC msgToSensorC - requester CertificateRequester + msgToSensorC msgToSensorC + requester CertificateRequester } func newFixture() *fixture { From 35baa5b8957ab374a8a5631e61a92f071b225edc Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Thu, 27 Jan 2022 16:15:52 +0100 Subject: [PATCH 05/29] add tests for wrong request ids in response, and concurrent requests --- .../localscanner/certificate_request.go | 2 +- .../localscanner/certificate_requester.go | 3 + .../certificate_requester_test.go | 131 ++++++++++++------ 3 files changed, 91 insertions(+), 45 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_request.go b/sensor/kubernetes/localscanner/certificate_request.go index abe7c9910ec08..445ea73e4beec 100644 --- a/sensor/kubernetes/localscanner/certificate_request.go +++ b/sensor/kubernetes/localscanner/certificate_request.go @@ -33,7 +33,7 @@ func (i *certRequestSyncImpl) requestCertificates(ctx context.Context) (*central case <-ctx.Done(): return nil, ctx.Err() case i.msgFromSensorC <- msg: - log.Debugf("request to issue local Scanner certificates sent to Central succesfully: %v", msg) + log.Debugf("request to issue local Scanner certificates sent to Central successfully: %v", msg) } var response *central.IssueLocalScannerCertsResponse diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 8409bad9f49f5..d8956ea750c27 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -60,6 +60,9 @@ func (m *certificateRequesterImpl) forwardMessagesToSensor() { requestC, ok := m.requests.Load(msg.GetRequestId()) if ok { requestC.(msgToSensorC) <- msg + } else { + log.Debugf("request ID %q does not match any known request ID, skipping request", + msg.GetRequestId()) // FIXME debug } } } diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index c77ffc45cc0b6..42ed590e4d525 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -16,83 +16,126 @@ func TestCertificateRequester(t *testing.T) { type certificateRequesterSuite struct { suite.Suite -} - -type fixture struct { msgFromSensorC msgFromSensorC msgToSensorC msgToSensorC requester CertificateRequester } -func newFixture() *fixture { - msgFromSensorC := make(msgFromSensorC) - msgToSensorC := make(msgToSensorC) - return &fixture{ - msgFromSensorC: msgFromSensorC, - msgToSensorC: msgToSensorC, - requester: NewCertificateRequester(msgFromSensorC, msgToSensorC), - } +func (s *certificateRequesterSuite) SetupTest() { + s.msgFromSensorC = make(msgFromSensorC) + s.msgToSensorC = make(msgToSensorC) + s.requester = NewCertificateRequester(s.msgFromSensorC, s.msgToSensorC) + s.requester.Start() } -func (s *certificateRequesterSuite) TestRequestCancellation() { - f := newFixture() - f.requester.Start() // FIXME don't start and this is much simpler - defer f.requester.Stop() +func (s *certificateRequesterSuite) TearDownTest() { + s.requester.Stop() +} +func (s *certificateRequesterSuite) TestRequestCancellation() { requestCtx, cancelRequestCtx := context.WithCancel(context.Background()) doneErrSig := concurrency.NewErrorSignal() go func() { - certs, err := f.requester.RequestCertificates(requestCtx) + certs, err := s.requester.RequestCertificates(requestCtx) s.Nil(certs) doneErrSig.SignalWithError(err) }() cancelRequestCtx() - waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), time.Second) - defer cancelWaitCtx() - requestErr, ok := doneErrSig.WaitUntil(waitCtx) + requestErr, ok := doneErrSig.WaitWithTimeout(100 * time.Millisecond) s.Require().True(ok) s.Equal(context.Canceled, requestErr) } func (s *certificateRequesterSuite) TestRequestSuccess() { - f := newFixture() - f.requester.Start() - defer f.requester.Stop() - waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), time.Second) + waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancelWaitCtx() - doneErrSig := concurrency.NewErrorSignal() - expectedResponseC := make(chan *central.IssueLocalScannerCertsResponse) + responseC := make(msgToSensorC) + var interceptedRequestID string go func() { - response, err := f.requester.RequestCertificates(waitCtx) - expectedResponse := <-expectedResponseC - s.Equal(expectedResponse, response) - s.Nil(err) - doneErrSig.Signal() + select { + case <-waitCtx.Done(): + return + case request := <-s.msgFromSensorC: + interceptedRequestID = request.GetIssueLocalScannerCertsRequest().GetRequestId() + s.NotEmpty(interceptedRequestID) + s.msgToSensorC <- ¢ral.IssueLocalScannerCertsResponse{ + RequestId: interceptedRequestID, + } + } }() + go func() { + response, err := s.requester.RequestCertificates(waitCtx) + s.NoError(err) + responseC <- response + }() + + select { + case response := <-responseC: + s.Equal(interceptedRequestID, response.GetRequestId()) + case <-waitCtx.Done(): + s.Require().Fail("timeout reached") + } +} + +func (s *certificateRequesterSuite) TestResponsesWithUnknownIDAreIgnored() { + waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancelWaitCtx() + doneErrSig := concurrency.NewErrorSignal() + go func() { select { case <-waitCtx.Done(): - return - case request := <-f.msgFromSensorC: - s.Require().NotNil(request.GetIssueLocalScannerCertsRequest()) - requestID := request.GetIssueLocalScannerCertsRequest().GetRequestId() - s.Require().NotEmpty(requestID) - // should be ignored. - f.msgToSensorC <- ¢ral.IssueLocalScannerCertsResponse{ - RequestId: "", - } - expectedResponse := ¢ral.IssueLocalScannerCertsResponse{ - RequestId: requestID, + case <-s.msgFromSensorC: + select { + case <-waitCtx.Done(): + // Request with different request ID should be ignored. + case s.msgToSensorC <- ¢ral.IssueLocalScannerCertsResponse{RequestId: ""}: } - f.msgToSensorC <- expectedResponse - expectedResponseC <- expectedResponse } }() - _, ok := doneErrSig.WaitUntil(waitCtx) + go func() { + certs, err := s.requester.RequestCertificates(waitCtx) + s.Nil(certs) + doneErrSig.SignalWithError(err) + }() + + requestErr, ok := doneErrSig.WaitWithTimeout(100 * time.Millisecond) + s.Require().True(ok) + s.Equal(context.DeadlineExceeded, requestErr) +} + +func (s *certificateRequesterSuite) TestRequestConcurrentRequestDoNotInterfere() { + waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancelWaitCtx() + numConcurrentRequests := 3 + waitGroup := concurrency.NewWaitGroup(numConcurrentRequests) + + for i := 0; i < numConcurrentRequests; i++ { + go func() { + select { + case <-waitCtx.Done(): + return + case request := <-s.msgFromSensorC: + interceptedRequestID := request.GetIssueLocalScannerCertsRequest().GetRequestId() + s.NotEmpty(interceptedRequestID) + s.msgToSensorC <- ¢ral.IssueLocalScannerCertsResponse{ + RequestId: interceptedRequestID, + } + } + }() + + go func() { + _, err := s.requester.RequestCertificates(waitCtx) + s.NoError(err) + waitGroup.Add(-1) + }() + } + + ok := concurrency.WaitWithTimeout(&waitGroup, 100*time.Millisecond) s.Require().True(ok) } From d05ff9c7dbaeff13248616f77236eff3fe0de97c Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Thu, 27 Jan 2022 16:35:13 +0100 Subject: [PATCH 06/29] fix style --- pkg/sync/common_aliases.go | 3 +++ sensor/kubernetes/localscanner/certificate_requester.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/sync/common_aliases.go b/pkg/sync/common_aliases.go index 41ef90e6f8d98..44305380a8902 100644 --- a/pkg/sync/common_aliases.go +++ b/pkg/sync/common_aliases.go @@ -10,3 +10,6 @@ type WaitGroup = sync.WaitGroup // Locker is an alias for `sync.Locker`. type Locker = sync.Locker + +// Map is an alias for `sync.Map`. +type Map = sync.Map diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index d8956ea750c27..6e786b4dac904 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -2,11 +2,11 @@ package localscanner import ( "context" - "sync" "github.com/stackrox/rox/generated/internalapi/central" "github.com/stackrox/rox/pkg/concurrency" "github.com/stackrox/rox/pkg/logging" + "github.com/stackrox/rox/pkg/sync" "github.com/stackrox/rox/pkg/uuid" ) From 4c159de9b983b8b9bfa33208192c24354752ede5 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Fri, 28 Jan 2022 12:59:29 +0100 Subject: [PATCH 07/29] simplify code removing certRequestSyncImpl --- .../localscanner/certificate_request.go | 55 ---------------- .../localscanner/certificate_requester.go | 65 ++++++++++++++----- 2 files changed, 47 insertions(+), 73 deletions(-) delete mode 100644 sensor/kubernetes/localscanner/certificate_request.go diff --git a/sensor/kubernetes/localscanner/certificate_request.go b/sensor/kubernetes/localscanner/certificate_request.go deleted file mode 100644 index 445ea73e4beec..0000000000000 --- a/sensor/kubernetes/localscanner/certificate_request.go +++ /dev/null @@ -1,55 +0,0 @@ -package localscanner - -import ( - "context" - - "github.com/stackrox/rox/generated/internalapi/central" -) - -var ( - _ certificateRequest = (*certRequestSyncImpl)(nil) -) - -// certificateRequest request a new set of local scanner certificates to central. -type certificateRequest interface { - requestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) -} - -type certRequestSyncImpl struct { - requestID string - msgFromSensorC msgFromSensorC - msgToSensorC msgToSensorC -} - -func (i *certRequestSyncImpl) requestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { - msg := ¢ral.MsgFromSensor{ - Msg: ¢ral.MsgFromSensor_IssueLocalScannerCertsRequest{ - IssueLocalScannerCertsRequest: ¢ral.IssueLocalScannerCertsRequest{ - RequestId: i.requestID, - }, - }, - } - select { - case <-ctx.Done(): - return nil, ctx.Err() - case i.msgFromSensorC <- msg: - log.Debugf("request to issue local Scanner certificates sent to Central successfully: %v", msg) - } - - var response *central.IssueLocalScannerCertsResponse - for response == nil { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case newResponse := <-i.msgToSensorC: - if newResponse.GetRequestId() != i.requestID { - log.Debugf("request id %q does not match %q, skipping request", response.GetRequestId(), - i.requestID) - } else { - response = newResponse - } - } - } - - return response, nil -} diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 6e786b4dac904..3d82e4fae21ad 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -43,39 +43,68 @@ type certificateRequesterImpl struct { requests sync.Map } -func (m *certificateRequesterImpl) Start() { - go m.forwardMessagesToSensor() +func (r *certificateRequesterImpl) Start() { + go r.forwardMessagesToSensor() } -func (m *certificateRequesterImpl) Stop() { - m.stopC.Signal() +func (r *certificateRequesterImpl) Stop() { + r.stopC.Signal() } -func (m *certificateRequesterImpl) forwardMessagesToSensor() { +func (r *certificateRequesterImpl) forwardMessagesToSensor() { for { select { - case <-m.stopC.Done(): + case <-r.stopC.Done(): return - case msg := <-m.msgToSensorC: - requestC, ok := m.requests.Load(msg.GetRequestId()) + case msg := <-r.msgToSensorC: + requestC, ok := r.requests.Load(msg.GetRequestId()) if ok { requestC.(msgToSensorC) <- msg } else { log.Debugf("request ID %q does not match any known request ID, skipping request", - msg.GetRequestId()) // FIXME debug + msg.GetRequestId()) } } } } -func (m *certificateRequesterImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { - request := &certRequestSyncImpl{ - requestID: uuid.NewV4().String(), - msgFromSensorC: m.msgFromSensorC, - msgToSensorC: make(msgToSensorC), +func (r *certificateRequesterImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { + requestID, requestErr := r.send(ctx) + if requestErr != nil { + return nil, requestErr + } + + msgToSensorC := make(msgToSensorC) + r.requests.Store(requestID, msgToSensorC) + response, responseErr := receive(ctx, msgToSensorC) + r.requests.Delete(requestID) + + return response, responseErr +} + +func (r *certificateRequesterImpl) send(ctx context.Context) (requestID string, err error) { + requestID = uuid.NewV4().String() + msg := ¢ral.MsgFromSensor{ + Msg: ¢ral.MsgFromSensor_IssueLocalScannerCertsRequest{ + IssueLocalScannerCertsRequest: ¢ral.IssueLocalScannerCertsRequest{ + RequestId: requestID, + }, + }, + } + select { + case <-ctx.Done(): + return "", ctx.Err() + case r.msgFromSensorC <- msg: + log.Debugf("request to issue local Scanner certificates sent to Central successfully: %v", msg) + return requestID, nil + } +} + +func receive(ctx context.Context, msgToSensorC msgToSensorC) (*central.IssueLocalScannerCertsResponse, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case response := <-msgToSensorC: + return response, nil } - m.requests.Store(request.requestID, request.msgToSensorC) - response, err := request.requestCertificates(ctx) - m.requests.Delete(request.requestID) - return response, err } From e5cfae85cc66ce1dc04e60244fce271d3d12f3cf Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Fri, 28 Jan 2022 13:54:48 +0100 Subject: [PATCH 08/29] do not send request until prepared to receive response --- .../localscanner/certificate_requester.go | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 3d82e4fae21ad..56b64db459505 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -69,21 +69,18 @@ func (r *certificateRequesterImpl) forwardMessagesToSensor() { } func (r *certificateRequesterImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { - requestID, requestErr := r.send(ctx) - if requestErr != nil { - return nil, requestErr - } - - msgToSensorC := make(msgToSensorC) - r.requests.Store(requestID, msgToSensorC) - response, responseErr := receive(ctx, msgToSensorC) - r.requests.Delete(requestID) + requestID := uuid.NewV4().String() + receiveC := make(msgToSensorC) + r.requests.Store(requestID, receiveC) + defer r.requests.Delete(requestID) - return response, responseErr + if err := r.send(ctx, requestID); err != nil { + return nil, err + } + return receive(ctx, receiveC) } -func (r *certificateRequesterImpl) send(ctx context.Context) (requestID string, err error) { - requestID = uuid.NewV4().String() +func (r *certificateRequesterImpl) send(ctx context.Context, requestID string) error { msg := ¢ral.MsgFromSensor{ Msg: ¢ral.MsgFromSensor_IssueLocalScannerCertsRequest{ IssueLocalScannerCertsRequest: ¢ral.IssueLocalScannerCertsRequest{ @@ -93,10 +90,10 @@ func (r *certificateRequesterImpl) send(ctx context.Context) (requestID string, } select { case <-ctx.Done(): - return "", ctx.Err() + return ctx.Err() case r.msgFromSensorC <- msg: log.Debugf("request to issue local Scanner certificates sent to Central successfully: %v", msg) - return requestID, nil + return nil } } From 94149b54cc628b5440f51c02379428dcb2279d9e Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Fri, 28 Jan 2022 14:28:49 +0100 Subject: [PATCH 09/29] remove type alias and rename channel fields --- .../localscanner/certificate_requester.go | 31 ++++++------- .../certificate_requester_test.go | 46 ++++++++----------- 2 files changed, 33 insertions(+), 44 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 56b64db459505..2f94cd720a40e 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -25,22 +25,21 @@ type CertificateRequester interface { // NewCertificateRequester creates a new certificate requester that communicates through // the specified channels and initializes a new request ID for reach request. // To use it call Start, and then make requests with RequestCertificates, concurrent requests are supported. -// This assumes that the certificate requester is the only consumer of msgToSensorC. -func NewCertificateRequester(msgFromSensorC msgFromSensorC, msgToSensorC msgToSensorC) CertificateRequester { +// This assumes that the certificate requester is the only consumer of receiveC. +func NewCertificateRequester(msgFromSensorC chan *central.MsgFromSensor, + msgToSensorC chan *central.IssueLocalScannerCertsResponse) CertificateRequester { return &certificateRequesterImpl{ - stopC: concurrency.NewErrorSignal(), - msgFromSensorC: msgFromSensorC, - msgToSensorC: msgToSensorC, + stopC: concurrency.NewErrorSignal(), + sendC: msgFromSensorC, + receiveC: msgToSensorC, } } -type msgFromSensorC chan *central.MsgFromSensor -type msgToSensorC chan *central.IssueLocalScannerCertsResponse type certificateRequesterImpl struct { - stopC concurrency.ErrorSignal - msgFromSensorC msgFromSensorC - msgToSensorC msgToSensorC - requests sync.Map + stopC concurrency.ErrorSignal + sendC chan *central.MsgFromSensor + receiveC chan *central.IssueLocalScannerCertsResponse + requests sync.Map } func (r *certificateRequesterImpl) Start() { @@ -56,10 +55,10 @@ func (r *certificateRequesterImpl) forwardMessagesToSensor() { select { case <-r.stopC.Done(): return - case msg := <-r.msgToSensorC: + case msg := <-r.receiveC: requestC, ok := r.requests.Load(msg.GetRequestId()) if ok { - requestC.(msgToSensorC) <- msg + requestC.(chan *central.IssueLocalScannerCertsResponse) <- msg } else { log.Debugf("request ID %q does not match any known request ID, skipping request", msg.GetRequestId()) @@ -70,7 +69,7 @@ func (r *certificateRequesterImpl) forwardMessagesToSensor() { func (r *certificateRequesterImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { requestID := uuid.NewV4().String() - receiveC := make(msgToSensorC) + receiveC := make(chan *central.IssueLocalScannerCertsResponse) r.requests.Store(requestID, receiveC) defer r.requests.Delete(requestID) @@ -91,13 +90,13 @@ func (r *certificateRequesterImpl) send(ctx context.Context, requestID string) e select { case <-ctx.Done(): return ctx.Err() - case r.msgFromSensorC <- msg: + case r.sendC <- msg: log.Debugf("request to issue local Scanner certificates sent to Central successfully: %v", msg) return nil } } -func receive(ctx context.Context, msgToSensorC msgToSensorC) (*central.IssueLocalScannerCertsResponse, error) { +func receive(ctx context.Context, msgToSensorC chan *central.IssueLocalScannerCertsResponse) (*central.IssueLocalScannerCertsResponse, error) { select { case <-ctx.Done(): return nil, ctx.Err() diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index 42ed590e4d525..f51a7a60b2f35 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -16,15 +16,15 @@ func TestCertificateRequester(t *testing.T) { type certificateRequesterSuite struct { suite.Suite - msgFromSensorC msgFromSensorC - msgToSensorC msgToSensorC - requester CertificateRequester + sendC chan *central.MsgFromSensor + receiveC chan *central.IssueLocalScannerCertsResponse + requester CertificateRequester } func (s *certificateRequesterSuite) SetupTest() { - s.msgFromSensorC = make(msgFromSensorC) - s.msgToSensorC = make(msgToSensorC) - s.requester = NewCertificateRequester(s.msgFromSensorC, s.msgToSensorC) + s.sendC = make(chan *central.MsgFromSensor) + s.receiveC = make(chan *central.IssueLocalScannerCertsResponse) + s.requester = NewCertificateRequester(s.sendC, s.receiveC) s.requester.Start() } @@ -52,33 +52,23 @@ func (s *certificateRequesterSuite) TestRequestSuccess() { waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancelWaitCtx() - responseC := make(msgToSensorC) var interceptedRequestID string go func() { select { case <-waitCtx.Done(): return - case request := <-s.msgFromSensorC: + case request := <-s.sendC: interceptedRequestID = request.GetIssueLocalScannerCertsRequest().GetRequestId() s.NotEmpty(interceptedRequestID) - s.msgToSensorC <- ¢ral.IssueLocalScannerCertsResponse{ + s.receiveC <- ¢ral.IssueLocalScannerCertsResponse{ RequestId: interceptedRequestID, } } }() - go func() { - response, err := s.requester.RequestCertificates(waitCtx) - s.NoError(err) - responseC <- response - }() - - select { - case response := <-responseC: - s.Equal(interceptedRequestID, response.GetRequestId()) - case <-waitCtx.Done(): - s.Require().Fail("timeout reached") - } + response, err := s.requester.RequestCertificates(waitCtx) + s.NoError(err) + s.Equal(interceptedRequestID, response.GetRequestId()) } func (s *certificateRequesterSuite) TestResponsesWithUnknownIDAreIgnored() { @@ -89,11 +79,11 @@ func (s *certificateRequesterSuite) TestResponsesWithUnknownIDAreIgnored() { go func() { select { case <-waitCtx.Done(): - case <-s.msgFromSensorC: + case <-s.sendC: select { case <-waitCtx.Done(): // Request with different request ID should be ignored. - case s.msgToSensorC <- ¢ral.IssueLocalScannerCertsResponse{RequestId: ""}: + case s.receiveC <- ¢ral.IssueLocalScannerCertsResponse{RequestId: ""}: } } }() @@ -110,9 +100,9 @@ func (s *certificateRequesterSuite) TestResponsesWithUnknownIDAreIgnored() { } func (s *certificateRequesterSuite) TestRequestConcurrentRequestDoNotInterfere() { - waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), 50*time.Millisecond) + waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), 1*time.Second) defer cancelWaitCtx() - numConcurrentRequests := 3 + numConcurrentRequests := 2 waitGroup := concurrency.NewWaitGroup(numConcurrentRequests) for i := 0; i < numConcurrentRequests; i++ { @@ -120,10 +110,10 @@ func (s *certificateRequesterSuite) TestRequestConcurrentRequestDoNotInterfere() select { case <-waitCtx.Done(): return - case request := <-s.msgFromSensorC: + case request := <-s.sendC: interceptedRequestID := request.GetIssueLocalScannerCertsRequest().GetRequestId() s.NotEmpty(interceptedRequestID) - s.msgToSensorC <- ¢ral.IssueLocalScannerCertsResponse{ + s.receiveC <- ¢ral.IssueLocalScannerCertsResponse{ RequestId: interceptedRequestID, } } @@ -136,6 +126,6 @@ func (s *certificateRequesterSuite) TestRequestConcurrentRequestDoNotInterfere() }() } - ok := concurrency.WaitWithTimeout(&waitGroup, 100*time.Millisecond) + ok := concurrency.WaitWithTimeout(&waitGroup, 2*time.Second) s.Require().True(ok) } From 83e8c5a2fbbd79068cbab0f492932d6f24371b9d Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Fri, 28 Jan 2022 15:50:43 +0100 Subject: [PATCH 10/29] prevent deadlock on request cancellation --- .../kubernetes/localscanner/certificate_requester.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 2f94cd720a40e..eac0797883b42 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -57,7 +57,11 @@ func (r *certificateRequesterImpl) forwardMessagesToSensor() { return case msg := <-r.receiveC: requestC, ok := r.requests.Load(msg.GetRequestId()) + r.requests.Delete(msg.GetRequestId()) if ok { + // doesn't block even if the corresponding call to RequestCertificates is cancelled and no one + // ever reads this, because requestC has buffer of 1, and we removed it from `r.request` above, + // in case we get more than 1 response for `msg.GetRequestId()`. requestC.(chan *central.IssueLocalScannerCertsResponse) <- msg } else { log.Debugf("request ID %q does not match any known request ID, skipping request", @@ -69,8 +73,10 @@ func (r *certificateRequesterImpl) forwardMessagesToSensor() { func (r *certificateRequesterImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { requestID := uuid.NewV4().String() - receiveC := make(chan *central.IssueLocalScannerCertsResponse) + receiveC := make(chan *central.IssueLocalScannerCertsResponse, 1) r.requests.Store(requestID, receiveC) + // always delete this entry when leaving this scope to account for requests that are never responded, to avoid + // having entries in `r.requests` that are never removed. defer r.requests.Delete(requestID) if err := r.send(ctx, requestID); err != nil { @@ -96,11 +102,11 @@ func (r *certificateRequesterImpl) send(ctx context.Context, requestID string) e } } -func receive(ctx context.Context, msgToSensorC chan *central.IssueLocalScannerCertsResponse) (*central.IssueLocalScannerCertsResponse, error) { +func receive(ctx context.Context, receiveC chan *central.IssueLocalScannerCertsResponse) (*central.IssueLocalScannerCertsResponse, error) { select { case <-ctx.Done(): return nil, ctx.Err() - case response := <-msgToSensorC: + case response := <-receiveC: return response, nil } } From 76b916a85041b63bdac49bb468b7aeaf6d07c87b Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Fri, 28 Jan 2022 16:34:53 +0100 Subject: [PATCH 11/29] simplify tests --- .../localscanner/certificate_requester.go | 2 +- .../certificate_requester_test.go | 105 ++++++++---------- 2 files changed, 47 insertions(+), 60 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index eac0797883b42..725abbf0cc426 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -36,7 +36,7 @@ func NewCertificateRequester(msgFromSensorC chan *central.MsgFromSensor, } type certificateRequesterImpl struct { - stopC concurrency.ErrorSignal + stopC concurrency.ErrorSignal sendC chan *central.MsgFromSensor receiveC chan *central.IssueLocalScannerCertsResponse requests sync.Map diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index f51a7a60b2f35..f3bdf99f4d65e 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -2,6 +2,7 @@ package localscanner import ( "context" + "sync/atomic" "testing" "time" @@ -10,15 +11,20 @@ import ( "github.com/stretchr/testify/suite" ) +var ( + testTimeout = time.Second +) + func TestCertificateRequester(t *testing.T) { suite.Run(t, new(certificateRequesterSuite)) } type certificateRequesterSuite struct { suite.Suite - sendC chan *central.MsgFromSensor - receiveC chan *central.IssueLocalScannerCertsResponse - requester CertificateRequester + sendC chan *central.MsgFromSensor + receiveC chan *central.IssueLocalScannerCertsResponse + requester CertificateRequester + interceptedRequestID atomic.Value } func (s *certificateRequesterSuite) SetupTest() { @@ -43,89 +49,70 @@ func (s *certificateRequesterSuite) TestRequestCancellation() { }() cancelRequestCtx() - requestErr, ok := doneErrSig.WaitWithTimeout(100 * time.Millisecond) + requestErr, ok := doneErrSig.WaitWithTimeout(testTimeout) s.Require().True(ok) s.Equal(context.Canceled, requestErr) } func (s *certificateRequesterSuite) TestRequestSuccess() { - waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancelWaitCtx() + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() - var interceptedRequestID string - go func() { - select { - case <-waitCtx.Done(): - return - case request := <-s.sendC: - interceptedRequestID = request.GetIssueLocalScannerCertsRequest().GetRequestId() - s.NotEmpty(interceptedRequestID) - s.receiveC <- ¢ral.IssueLocalScannerCertsResponse{ - RequestId: interceptedRequestID, - } - } - }() + go s.respondRequest(ctx, "") - response, err := s.requester.RequestCertificates(waitCtx) + response, err := s.requester.RequestCertificates(ctx) s.NoError(err) - s.Equal(interceptedRequestID, response.GetRequestId()) + s.Equal(s.interceptedRequestID.Load(), response.GetRequestId()) } func (s *certificateRequesterSuite) TestResponsesWithUnknownIDAreIgnored() { - waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), 10*time.Millisecond) - defer cancelWaitCtx() - doneErrSig := concurrency.NewErrorSignal() + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() - go func() { - select { - case <-waitCtx.Done(): - case <-s.sendC: - select { - case <-waitCtx.Done(): - // Request with different request ID should be ignored. - case s.receiveC <- ¢ral.IssueLocalScannerCertsResponse{RequestId: ""}: - } - } - }() + // Request with different request ID should be ignored. + go s.respondRequest(ctx, "UNKNOWN") - go func() { - certs, err := s.requester.RequestCertificates(waitCtx) - s.Nil(certs) - doneErrSig.SignalWithError(err) - }() - - requestErr, ok := doneErrSig.WaitWithTimeout(100 * time.Millisecond) - s.Require().True(ok) + certs, requestErr := s.requester.RequestCertificates(ctx) + s.Nil(certs) s.Equal(context.DeadlineExceeded, requestErr) } func (s *certificateRequesterSuite) TestRequestConcurrentRequestDoNotInterfere() { - waitCtx, cancelWaitCtx := context.WithTimeout(context.Background(), 1*time.Second) - defer cancelWaitCtx() + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() numConcurrentRequests := 2 waitGroup := concurrency.NewWaitGroup(numConcurrentRequests) for i := 0; i < numConcurrentRequests; i++ { - go func() { - select { - case <-waitCtx.Done(): - return - case request := <-s.sendC: - interceptedRequestID := request.GetIssueLocalScannerCertsRequest().GetRequestId() - s.NotEmpty(interceptedRequestID) - s.receiveC <- ¢ral.IssueLocalScannerCertsResponse{ - RequestId: interceptedRequestID, - } - } - }() + go s.respondRequest(ctx, "") go func() { - _, err := s.requester.RequestCertificates(waitCtx) + _, err := s.requester.RequestCertificates(ctx) s.NoError(err) waitGroup.Add(-1) }() } - ok := concurrency.WaitWithTimeout(&waitGroup, 2*time.Second) + ok := concurrency.WaitWithTimeout(&waitGroup, time.Duration(numConcurrentRequests)*testTimeout) s.Require().True(ok) } + +// respondRequest reads a request from `s.sendC` and responds with `responseRequestID` as the requestID, or with +// the same ID as the request if `responseRequestID` is "". +// Before sending the response, it stores in s.responseRequestID the request ID for the requests read from `s.sendC`. +func (s *certificateRequesterSuite) respondRequest(ctx context.Context, responseRequestID string) { + select { + case <-ctx.Done(): + case request := <-s.sendC: + interceptedRequestID := request.GetIssueLocalScannerCertsRequest().GetRequestId() + s.NotEmpty(interceptedRequestID) + if responseRequestID != "" { + interceptedRequestID = responseRequestID + } + s.interceptedRequestID.Store(interceptedRequestID) + select { + case <-ctx.Done(): + case s.receiveC <- ¢ral.IssueLocalScannerCertsResponse{RequestId: interceptedRequestID}: + } + } +} From 5166fb155cfa3ab8207b95352dcf188cb287277c Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Fri, 28 Jan 2022 17:28:15 +0100 Subject: [PATCH 12/29] improve code style --- .../localscanner/certificate_requester.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 725abbf0cc426..99f6c6268f0bd 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -43,30 +43,30 @@ type certificateRequesterImpl struct { } func (r *certificateRequesterImpl) Start() { - go r.forwardMessagesToSensor() + go r.dispatchResponses() } func (r *certificateRequesterImpl) Stop() { r.stopC.Signal() } -func (r *certificateRequesterImpl) forwardMessagesToSensor() { +func (r *certificateRequesterImpl) dispatchResponses() { for { select { case <-r.stopC.Done(): return case msg := <-r.receiveC: requestC, ok := r.requests.Load(msg.GetRequestId()) - r.requests.Delete(msg.GetRequestId()) - if ok { - // doesn't block even if the corresponding call to RequestCertificates is cancelled and no one - // ever reads this, because requestC has buffer of 1, and we removed it from `r.request` above, - // in case we get more than 1 response for `msg.GetRequestId()`. - requestC.(chan *central.IssueLocalScannerCertsResponse) <- msg - } else { + if !ok { log.Debugf("request ID %q does not match any known request ID, skipping request", msg.GetRequestId()) + continue } + r.requests.Delete(msg.GetRequestId()) + // doesn't block even if the corresponding call to RequestCertificates is cancelled and no one + // ever reads this, because requestC has buffer of 1, and we removed it from `r.request` above, + // in case we get more than 1 response for `msg.GetRequestId()`. + requestC.(chan *central.IssueLocalScannerCertsResponse) <- msg } } } From a89e59485fe8e52ed4effcc2a3d4b43d424133b5 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Fri, 28 Jan 2022 17:33:15 +0100 Subject: [PATCH 13/29] restrict channel directions --- .../localscanner/certificate_requester.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 99f6c6268f0bd..1f88e1a64294b 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -26,19 +26,19 @@ type CertificateRequester interface { // the specified channels and initializes a new request ID for reach request. // To use it call Start, and then make requests with RequestCertificates, concurrent requests are supported. // This assumes that the certificate requester is the only consumer of receiveC. -func NewCertificateRequester(msgFromSensorC chan *central.MsgFromSensor, - msgToSensorC chan *central.IssueLocalScannerCertsResponse) CertificateRequester { +func NewCertificateRequester(sendC chan<- *central.MsgFromSensor, + receiveC <-chan *central.IssueLocalScannerCertsResponse) CertificateRequester { return &certificateRequesterImpl{ stopC: concurrency.NewErrorSignal(), - sendC: msgFromSensorC, - receiveC: msgToSensorC, + sendC: sendC, + receiveC: receiveC, } } type certificateRequesterImpl struct { stopC concurrency.ErrorSignal - sendC chan *central.MsgFromSensor - receiveC chan *central.IssueLocalScannerCertsResponse + sendC chan<- *central.MsgFromSensor + receiveC <-chan *central.IssueLocalScannerCertsResponse requests sync.Map } @@ -102,7 +102,7 @@ func (r *certificateRequesterImpl) send(ctx context.Context, requestID string) e } } -func receive(ctx context.Context, receiveC chan *central.IssueLocalScannerCertsResponse) (*central.IssueLocalScannerCertsResponse, error) { +func receive(ctx context.Context, receiveC <-chan *central.IssueLocalScannerCertsResponse) (*central.IssueLocalScannerCertsResponse, error) { select { case <-ctx.Done(): return nil, ctx.Err() From b477c0ee0220ba1d2dd97384e8e81bfc73d1d52c Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Mon, 31 Jan 2022 11:14:16 +0100 Subject: [PATCH 14/29] improvements in test style --- .../certificate_requester_test.go | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index f3bdf99f4d65e..3720089acd10f 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -11,6 +11,10 @@ import ( "github.com/stretchr/testify/suite" ) +const ( + numConcurrentRequests = 2 +) + var ( testTimeout = time.Second ) @@ -58,7 +62,7 @@ func (s *certificateRequesterSuite) TestRequestSuccess() { ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() - go s.respondRequest(ctx, "") + go s.respondRequest(ctx, nil) response, err := s.requester.RequestCertificates(ctx) s.NoError(err) @@ -70,7 +74,7 @@ func (s *certificateRequesterSuite) TestResponsesWithUnknownIDAreIgnored() { defer cancel() // Request with different request ID should be ignored. - go s.respondRequest(ctx, "UNKNOWN") + go s.respondRequest(ctx, ¢ral.IssueLocalScannerCertsResponse{RequestId: "UNKNOWN"}) certs, requestErr := s.requester.RequestCertificates(ctx) s.Nil(certs) @@ -80,16 +84,15 @@ func (s *certificateRequesterSuite) TestResponsesWithUnknownIDAreIgnored() { func (s *certificateRequesterSuite) TestRequestConcurrentRequestDoNotInterfere() { ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() - numConcurrentRequests := 2 waitGroup := concurrency.NewWaitGroup(numConcurrentRequests) for i := 0; i < numConcurrentRequests; i++ { - go s.respondRequest(ctx, "") + go s.respondRequest(ctx, nil) go func() { + defer waitGroup.Add(-1) _, err := s.requester.RequestCertificates(ctx) s.NoError(err) - waitGroup.Add(-1) }() } @@ -100,19 +103,22 @@ func (s *certificateRequesterSuite) TestRequestConcurrentRequestDoNotInterfere() // respondRequest reads a request from `s.sendC` and responds with `responseRequestID` as the requestID, or with // the same ID as the request if `responseRequestID` is "". // Before sending the response, it stores in s.responseRequestID the request ID for the requests read from `s.sendC`. -func (s *certificateRequesterSuite) respondRequest(ctx context.Context, responseRequestID string) { +func (s *certificateRequesterSuite) respondRequest(ctx context.Context, responseOverwrite *central.IssueLocalScannerCertsResponse) { select { case <-ctx.Done(): case request := <-s.sendC: interceptedRequestID := request.GetIssueLocalScannerCertsRequest().GetRequestId() s.NotEmpty(interceptedRequestID) - if responseRequestID != "" { - interceptedRequestID = responseRequestID + var response *central.IssueLocalScannerCertsResponse + if responseOverwrite != nil { + response = responseOverwrite + } else { + response = ¢ral.IssueLocalScannerCertsResponse{RequestId: interceptedRequestID} } - s.interceptedRequestID.Store(interceptedRequestID) + s.interceptedRequestID.Store(response.GetRequestId()) select { case <-ctx.Done(): - case s.receiveC <- ¢ral.IssueLocalScannerCertsResponse{RequestId: interceptedRequestID}: + case s.receiveC <- response: } } } From 069beed9c2816447194a4e241b46988e3dc54c02 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Mon, 31 Jan 2022 12:41:13 +0100 Subject: [PATCH 15/29] improve comments --- sensor/kubernetes/localscanner/certificate_requester.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 1f88e1a64294b..86e175c3ec16d 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -15,7 +15,7 @@ var ( _ CertificateRequester = (*certificateRequesterImpl)(nil) ) -// CertificateRequester request a new set of local scanner certificates to central. +// CertificateRequester requests a new set of local scanner certificates from central. type CertificateRequester interface { Start() Stop() @@ -63,7 +63,7 @@ func (r *certificateRequesterImpl) dispatchResponses() { continue } r.requests.Delete(msg.GetRequestId()) - // doesn't block even if the corresponding call to RequestCertificates is cancelled and no one + // Doesn't block even if the corresponding call to RequestCertificates is cancelled and no one // ever reads this, because requestC has buffer of 1, and we removed it from `r.request` above, // in case we get more than 1 response for `msg.GetRequestId()`. requestC.(chan *central.IssueLocalScannerCertsResponse) <- msg @@ -75,7 +75,7 @@ func (r *certificateRequesterImpl) RequestCertificates(ctx context.Context) (*ce requestID := uuid.NewV4().String() receiveC := make(chan *central.IssueLocalScannerCertsResponse, 1) r.requests.Store(requestID, receiveC) - // always delete this entry when leaving this scope to account for requests that are never responded, to avoid + // Always delete this entry when leaving this scope to account for requests that are never responded, to avoid // having entries in `r.requests` that are never removed. defer r.requests.Delete(requestID) From c7f5bf1fa72397ec1ee8c09ec9eb086c801b6ecc Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Mon, 31 Jan 2022 12:43:39 +0100 Subject: [PATCH 16/29] rename requestC to responseC --- sensor/kubernetes/localscanner/certificate_requester.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 86e175c3ec16d..d61ca28690ed2 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -56,7 +56,7 @@ func (r *certificateRequesterImpl) dispatchResponses() { case <-r.stopC.Done(): return case msg := <-r.receiveC: - requestC, ok := r.requests.Load(msg.GetRequestId()) + responseC, ok := r.requests.Load(msg.GetRequestId()) if !ok { log.Debugf("request ID %q does not match any known request ID, skipping request", msg.GetRequestId()) @@ -66,7 +66,7 @@ func (r *certificateRequesterImpl) dispatchResponses() { // Doesn't block even if the corresponding call to RequestCertificates is cancelled and no one // ever reads this, because requestC has buffer of 1, and we removed it from `r.request` above, // in case we get more than 1 response for `msg.GetRequestId()`. - requestC.(chan *central.IssueLocalScannerCertsResponse) <- msg + responseC.(chan *central.IssueLocalScannerCertsResponse) <- msg } } } From 07f48cf4989c49a6d42a3897991fc6f310f1208b Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Mon, 31 Jan 2022 13:00:13 +0100 Subject: [PATCH 17/29] improve comments --- sensor/kubernetes/localscanner/certificate_requester.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index d61ca28690ed2..c30770c92f67b 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -25,7 +25,7 @@ type CertificateRequester interface { // NewCertificateRequester creates a new certificate requester that communicates through // the specified channels and initializes a new request ID for reach request. // To use it call Start, and then make requests with RequestCertificates, concurrent requests are supported. -// This assumes that the certificate requester is the only consumer of receiveC. +// This assumes that the returned certificate requester is the only consumer of `receiveC`. func NewCertificateRequester(sendC chan<- *central.MsgFromSensor, receiveC <-chan *central.IssueLocalScannerCertsResponse) CertificateRequester { return &certificateRequesterImpl{ @@ -42,10 +42,13 @@ type certificateRequesterImpl struct { requests sync.Map } +// Start makes the certificate requester listen to `receiveC` and forward responses to any request that is running +// as a call to RequestCertificates. func (r *certificateRequesterImpl) Start() { go r.dispatchResponses() } +// Stop makes the certificate stop forwarding responses to running requests. func (r *certificateRequesterImpl) Stop() { r.stopC.Signal() } @@ -71,6 +74,8 @@ func (r *certificateRequesterImpl) dispatchResponses() { } } +// RequestCertificates makes a new request for a new set of local scanner certificates from central. +// This assumes the certificate requester has been started by calling Start. func (r *certificateRequesterImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { requestID := uuid.NewV4().String() receiveC := make(chan *central.IssueLocalScannerCertsResponse, 1) From a7c024dd18b0f46281fa4c2d0fd4b3a8d5d640ab Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Mon, 31 Jan 2022 16:19:54 +0100 Subject: [PATCH 18/29] check the requester was started on RequestCertificates and fail owise --- .../localscanner/certificate_requester.go | 15 +++++++++++---- .../localscanner/certificate_requester_test.go | 17 +++++++++++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index c30770c92f67b..b4b047879e02a 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -3,6 +3,7 @@ package localscanner import ( "context" + "github.com/pkg/errors" "github.com/stackrox/rox/generated/internalapi/central" "github.com/stackrox/rox/pkg/concurrency" "github.com/stackrox/rox/pkg/logging" @@ -11,8 +12,11 @@ import ( ) var ( - log = logging.LoggerForModule() - _ CertificateRequester = (*certificateRequesterImpl)(nil) + // ErrCertificateRequesterNotStarted is returned by RequestCertificates when the certificate + // requested is not initialized. + ErrCertificateRequesterNotStarted = errors.New("not started") + log = logging.LoggerForModule() + _ CertificateRequester = (*certificateRequesterImpl)(nil) ) // CertificateRequester requests a new set of local scanner certificates from central. @@ -29,7 +33,6 @@ type CertificateRequester interface { func NewCertificateRequester(sendC chan<- *central.MsgFromSensor, receiveC <-chan *central.IssueLocalScannerCertsResponse) CertificateRequester { return &certificateRequesterImpl{ - stopC: concurrency.NewErrorSignal(), sendC: sendC, receiveC: receiveC, } @@ -46,6 +49,7 @@ type certificateRequesterImpl struct { // as a call to RequestCertificates. func (r *certificateRequesterImpl) Start() { go r.dispatchResponses() + r.stopC.Reset() } // Stop makes the certificate stop forwarding responses to running requests. @@ -75,7 +79,8 @@ func (r *certificateRequesterImpl) dispatchResponses() { } // RequestCertificates makes a new request for a new set of local scanner certificates from central. -// This assumes the certificate requester has been started by calling Start. +// This assumes the certificate requester has been started by calling Start, otherwise this +// returns ErrCertificateRequesterNotStarted. func (r *certificateRequesterImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { requestID := uuid.NewV4().String() receiveC := make(chan *central.IssueLocalScannerCertsResponse, 1) @@ -99,6 +104,8 @@ func (r *certificateRequesterImpl) send(ctx context.Context, requestID string) e }, } select { + case <-r.stopC.Done(): + return r.stopC.ErrorWithDefault(ErrCertificateRequesterNotStarted) case <-ctx.Done(): return ctx.Err() case r.sendC <- msg: diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index 3720089acd10f..4983c72f93ec8 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -42,6 +42,23 @@ func (s *certificateRequesterSuite) TearDownTest() { s.requester.Stop() } +func (s *certificateRequesterSuite) TestNotRequestFailureIfNotStarted() { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + doneErrSig := concurrency.NewErrorSignal() + + go func() { + certs, err := s.requester.RequestCertificates(ctx) + s.Nil(certs) + doneErrSig.SignalWithError(err) + }() + s.requester.Stop() + + requestErr, ok := doneErrSig.WaitWithTimeout(testTimeout) + s.Require().True(ok) + s.Equal(ErrCertificateRequesterNotStarted, requestErr) +} + func (s *certificateRequesterSuite) TestRequestCancellation() { requestCtx, cancelRequestCtx := context.WithCancel(context.Background()) doneErrSig := concurrency.NewErrorSignal() From cb2527ee59a33e317f5da57c62d3f0160efa7be3 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Mon, 31 Jan 2022 18:23:53 +0100 Subject: [PATCH 19/29] wip fixing flaky test --- .../localscanner/certificate_requester.go | 14 ++++---- .../certificate_requester_test.go | 34 ++++++++++++------- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index b4b047879e02a..409bab51aa256 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -12,10 +12,10 @@ import ( ) var ( - // ErrCertificateRequesterNotStarted is returned by RequestCertificates when the certificate + // ErrCertificateRequesterStopped is returned by RequestCertificates when the certificate // requested is not initialized. - ErrCertificateRequesterNotStarted = errors.New("not started") - log = logging.LoggerForModule() + ErrCertificateRequesterStopped = errors.New("not started") + log = logging.LoggerForModule() _ CertificateRequester = (*certificateRequesterImpl)(nil) ) @@ -52,7 +52,8 @@ func (r *certificateRequesterImpl) Start() { r.stopC.Reset() } -// Stop makes the certificate stop forwarding responses to running requests. +// Stop makes the certificate stop forwarding responses to running requests. Subsequent calls to RequestCertificates +// will fail with ErrCertificateRequesterStopped. func (r *certificateRequesterImpl) Stop() { r.stopC.Signal() } @@ -79,8 +80,7 @@ func (r *certificateRequesterImpl) dispatchResponses() { } // RequestCertificates makes a new request for a new set of local scanner certificates from central. -// This assumes the certificate requester has been started by calling Start, otherwise this -// returns ErrCertificateRequesterNotStarted. +// This assumes the certificate requester is started, otherwise this returns ErrCertificateRequesterStopped. func (r *certificateRequesterImpl) RequestCertificates(ctx context.Context) (*central.IssueLocalScannerCertsResponse, error) { requestID := uuid.NewV4().String() receiveC := make(chan *central.IssueLocalScannerCertsResponse, 1) @@ -105,7 +105,7 @@ func (r *certificateRequesterImpl) send(ctx context.Context, requestID string) e } select { case <-r.stopC.Done(): - return r.stopC.ErrorWithDefault(ErrCertificateRequesterNotStarted) + return r.stopC.ErrorWithDefault(ErrCertificateRequesterStopped) case <-ctx.Done(): return ctx.Err() case r.sendC <- msg: diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index 4983c72f93ec8..8bc1ea15e8eff 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -8,6 +8,7 @@ import ( "github.com/stackrox/rox/generated/internalapi/central" "github.com/stackrox/rox/pkg/concurrency" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" ) @@ -42,21 +43,21 @@ func (s *certificateRequesterSuite) TearDownTest() { s.requester.Stop() } -func (s *certificateRequesterSuite) TestNotRequestFailureIfNotStarted() { +func (s *certificateRequesterSuite) TestRequestFailureIfStopped() { ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() doneErrSig := concurrency.NewErrorSignal() + s.requester.Stop() go func() { certs, err := s.requester.RequestCertificates(ctx) s.Nil(certs) doneErrSig.SignalWithError(err) }() - s.requester.Stop() requestErr, ok := doneErrSig.WaitWithTimeout(testTimeout) s.Require().True(ok) - s.Equal(ErrCertificateRequesterNotStarted, requestErr) + s.Equal(ErrCertificateRequesterStopped, requestErr) } func (s *certificateRequesterSuite) TestRequestCancellation() { @@ -79,7 +80,8 @@ func (s *certificateRequesterSuite) TestRequestSuccess() { ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() - go s.respondRequest(ctx, nil) + go respondRequest(ctx, s.T(), &s.interceptedRequestID, s.sendC, s.receiveC, nil) + // s.respondRequest(ctx, nil) response, err := s.requester.RequestCertificates(ctx) s.NoError(err) @@ -91,7 +93,8 @@ func (s *certificateRequesterSuite) TestResponsesWithUnknownIDAreIgnored() { defer cancel() // Request with different request ID should be ignored. - go s.respondRequest(ctx, ¢ral.IssueLocalScannerCertsResponse{RequestId: "UNKNOWN"}) + go respondRequest(ctx, s.T(), &s.interceptedRequestID, s.sendC, s.receiveC, ¢ral.IssueLocalScannerCertsResponse{RequestId: "UNKNOWN"}) + // s.respondRequest(ctx, ¢ral.IssueLocalScannerCertsResponse{RequestId: "UNKNOWN"}) certs, requestErr := s.requester.RequestCertificates(ctx) s.Nil(certs) @@ -99,12 +102,13 @@ func (s *certificateRequesterSuite) TestResponsesWithUnknownIDAreIgnored() { } func (s *certificateRequesterSuite) TestRequestConcurrentRequestDoNotInterfere() { - ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + timeout := numConcurrentRequests*testTimeout + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() waitGroup := concurrency.NewWaitGroup(numConcurrentRequests) for i := 0; i < numConcurrentRequests; i++ { - go s.respondRequest(ctx, nil) + go respondRequest(ctx, s.T(), &s.interceptedRequestID, s.sendC, s.receiveC, nil) go func() { defer waitGroup.Add(-1) @@ -113,29 +117,33 @@ func (s *certificateRequesterSuite) TestRequestConcurrentRequestDoNotInterfere() }() } - ok := concurrency.WaitWithTimeout(&waitGroup, time.Duration(numConcurrentRequests)*testTimeout) + ok := concurrency.WaitWithTimeout(&waitGroup, timeout) s.Require().True(ok) } // respondRequest reads a request from `s.sendC` and responds with `responseRequestID` as the requestID, or with // the same ID as the request if `responseRequestID` is "". // Before sending the response, it stores in s.responseRequestID the request ID for the requests read from `s.sendC`. -func (s *certificateRequesterSuite) respondRequest(ctx context.Context, responseOverwrite *central.IssueLocalScannerCertsResponse) { +func respondRequest(ctx context.Context, t *testing.T, requestID *atomic.Value, + sendC chan *central.MsgFromSensor, + receiveC chan *central.IssueLocalScannerCertsResponse, responseOverwrite *central.IssueLocalScannerCertsResponse) { select { case <-ctx.Done(): - case request := <-s.sendC: + case request := <-sendC: + log.Warnf("respondRequest read request with id %q", request.GetIssueLocalScannerCertsRequest().GetRequestId()) // FIXME interceptedRequestID := request.GetIssueLocalScannerCertsRequest().GetRequestId() - s.NotEmpty(interceptedRequestID) + assert.NotEmpty(t, interceptedRequestID) var response *central.IssueLocalScannerCertsResponse if responseOverwrite != nil { response = responseOverwrite } else { response = ¢ral.IssueLocalScannerCertsResponse{RequestId: interceptedRequestID} } - s.interceptedRequestID.Store(response.GetRequestId()) + requestID.Store(response.GetRequestId()) + log.Warnf("respondRequest sending response %s", response) // FIXME select { case <-ctx.Done(): - case s.receiveC <- response: + case receiveC <- response: } } } From 5fa4ad95c825b17f33682530d1e5894221732d32 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 1 Feb 2022 10:44:30 +0100 Subject: [PATCH 20/29] fix bug in Start sometimes `dispatchResponses` was starting before `r.Stop` was reset, thus stopping dispatching immediatelly --- .../kubernetes/localscanner/certificate_requester.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 409bab51aa256..1473701c436cd 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -14,9 +14,9 @@ import ( var ( // ErrCertificateRequesterStopped is returned by RequestCertificates when the certificate // requested is not initialized. - ErrCertificateRequesterStopped = errors.New("not started") - log = logging.LoggerForModule() - _ CertificateRequester = (*certificateRequesterImpl)(nil) + ErrCertificateRequesterStopped = errors.New("stopped") + log = logging.LoggerForModule() + _ CertificateRequester = (*certificateRequesterImpl)(nil) ) // CertificateRequester requests a new set of local scanner certificates from central. @@ -39,17 +39,17 @@ func NewCertificateRequester(sendC chan<- *central.MsgFromSensor, } type certificateRequesterImpl struct { - stopC concurrency.ErrorSignal sendC chan<- *central.MsgFromSensor receiveC <-chan *central.IssueLocalScannerCertsResponse + stopC concurrency.ErrorSignal requests sync.Map } // Start makes the certificate requester listen to `receiveC` and forward responses to any request that is running // as a call to RequestCertificates. func (r *certificateRequesterImpl) Start() { - go r.dispatchResponses() r.stopC.Reset() + go r.dispatchResponses() } // Stop makes the certificate stop forwarding responses to running requests. Subsequent calls to RequestCertificates From 6987961fa27161448513201fd14366ec2ca7e010 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 1 Feb 2022 10:45:29 +0100 Subject: [PATCH 21/29] replace suite by independent suite objects to avoid interference between tests that causes DATA RACE errors --- .../certificate_requester_test.go | 161 +++++++++--------- 1 file changed, 83 insertions(+), 78 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index 8bc1ea15e8eff..9fde7150cdb60 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -9,7 +9,7 @@ import ( "github.com/stackrox/rox/generated/internalapi/central" "github.com/stackrox/rox/pkg/concurrency" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/suite" + "github.com/stretchr/testify/require" ) const ( @@ -20,117 +20,123 @@ var ( testTimeout = time.Second ) -func TestCertificateRequester(t *testing.T) { - suite.Run(t, new(certificateRequesterSuite)) -} - -type certificateRequesterSuite struct { - suite.Suite - sendC chan *central.MsgFromSensor - receiveC chan *central.IssueLocalScannerCertsResponse - requester CertificateRequester - interceptedRequestID atomic.Value -} - -func (s *certificateRequesterSuite) SetupTest() { - s.sendC = make(chan *central.MsgFromSensor) - s.receiveC = make(chan *central.IssueLocalScannerCertsResponse) - s.requester = NewCertificateRequester(s.sendC, s.receiveC) - s.requester.Start() -} - -func (s *certificateRequesterSuite) TearDownTest() { - s.requester.Stop() -} - -func (s *certificateRequesterSuite) TestRequestFailureIfStopped() { - ctx, cancel := context.WithTimeout(context.Background(), testTimeout) - defer cancel() +func TestCertificateRequesterRequestFailureIfStopped(t *testing.T) { + f := newFixture(0) + defer f.tearDown() doneErrSig := concurrency.NewErrorSignal() - s.requester.Stop() + f.requester.Stop() go func() { - certs, err := s.requester.RequestCertificates(ctx) - s.Nil(certs) + certs, err := f.requester.RequestCertificates(f.ctx) + assert.Nil(t, certs) doneErrSig.SignalWithError(err) }() requestErr, ok := doneErrSig.WaitWithTimeout(testTimeout) - s.Require().True(ok) - s.Equal(ErrCertificateRequesterStopped, requestErr) + require.True(t, ok) + assert.Equal(t, ErrCertificateRequesterStopped, requestErr) } -func (s *certificateRequesterSuite) TestRequestCancellation() { - requestCtx, cancelRequestCtx := context.WithCancel(context.Background()) +func TestCertificateRequesterRequestCancellation(t *testing.T) { + f := newFixture(0) + defer f.tearDown() doneErrSig := concurrency.NewErrorSignal() go func() { - certs, err := s.requester.RequestCertificates(requestCtx) - s.Nil(certs) + certs, err := f.requester.RequestCertificates(f.ctx) + assert.Nil(t, certs) doneErrSig.SignalWithError(err) }() - cancelRequestCtx() + f.cancelCtx() requestErr, ok := doneErrSig.WaitWithTimeout(testTimeout) - s.Require().True(ok) - s.Equal(context.Canceled, requestErr) + require.True(t, ok) + assert.Equal(t, context.Canceled, requestErr) } -func (s *certificateRequesterSuite) TestRequestSuccess() { - ctx, cancel := context.WithTimeout(context.Background(), testTimeout) - defer cancel() +func TestCertificateRequesterRequestSuccess(t *testing.T) { + f := newFixture(0) + defer f.tearDown() - go respondRequest(ctx, s.T(), &s.interceptedRequestID, s.sendC, s.receiveC, nil) - // s.respondRequest(ctx, nil) + go f.respondRequest(t, nil) - response, err := s.requester.RequestCertificates(ctx) - s.NoError(err) - s.Equal(s.interceptedRequestID.Load(), response.GetRequestId()) + response, err := f.requester.RequestCertificates(f.ctx) + assert.NoError(t, err) + assert.Equal(t, f.interceptedRequestID.Load(), response.GetRequestId()) } -func (s *certificateRequesterSuite) TestResponsesWithUnknownIDAreIgnored() { - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() +func TestCertificateRequesterResponsesWithUnknownIDAreIgnored(t *testing.T) { + f := newFixture(100 * time.Millisecond) + defer f.tearDown() // Request with different request ID should be ignored. - go respondRequest(ctx, s.T(), &s.interceptedRequestID, s.sendC, s.receiveC, ¢ral.IssueLocalScannerCertsResponse{RequestId: "UNKNOWN"}) - // s.respondRequest(ctx, ¢ral.IssueLocalScannerCertsResponse{RequestId: "UNKNOWN"}) + go f.respondRequest(t, ¢ral.IssueLocalScannerCertsResponse{RequestId: "UNKNOWN"}) - certs, requestErr := s.requester.RequestCertificates(ctx) - s.Nil(certs) - s.Equal(context.DeadlineExceeded, requestErr) + certs, requestErr := f.requester.RequestCertificates(f.ctx) + assert.Nil(t, certs) + assert.Equal(t, context.DeadlineExceeded, requestErr) } -func (s *certificateRequesterSuite) TestRequestConcurrentRequestDoNotInterfere() { - timeout := numConcurrentRequests*testTimeout - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() +func TestCertificateRequesterRequestConcurrentRequestDoNotInterfere(t *testing.T) { + f := newFixture(0) + defer f.tearDown() waitGroup := concurrency.NewWaitGroup(numConcurrentRequests) for i := 0; i < numConcurrentRequests; i++ { - go respondRequest(ctx, s.T(), &s.interceptedRequestID, s.sendC, s.receiveC, nil) - + go f.respondRequest(t, nil) go func() { defer waitGroup.Add(-1) - _, err := s.requester.RequestCertificates(ctx) - s.NoError(err) + _, err := f.requester.RequestCertificates(f.ctx) + assert.NoError(t, err) }() } + ok := concurrency.WaitWithTimeout(&waitGroup, time.Duration(numConcurrentRequests)*testTimeout) + require.True(t, ok) +} + +type certificateRequesterFixture struct { + sendC chan *central.MsgFromSensor + receiveC chan *central.IssueLocalScannerCertsResponse + requester CertificateRequester + interceptedRequestID *atomic.Value + ctx context.Context + cancelCtx context.CancelFunc +} + +// newFixture creates a new test fixture that uses `timeout` as context timeout if `timeout` is +// not 0, and `testTimeout` otherwise. +func newFixture(timeout time.Duration) *certificateRequesterFixture { + sendC := make(chan *central.MsgFromSensor) + receiveC := make(chan *central.IssueLocalScannerCertsResponse) + requester := NewCertificateRequester(sendC, receiveC) + var interceptedRequestID atomic.Value + if timeout == 0 { + timeout = testTimeout + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + requester.Start() + return &certificateRequesterFixture{ + sendC: sendC, + receiveC: receiveC, + requester: requester, + ctx: ctx, + cancelCtx: cancel, + interceptedRequestID: &interceptedRequestID, + } +} - ok := concurrency.WaitWithTimeout(&waitGroup, timeout) - s.Require().True(ok) +func (f *certificateRequesterFixture) tearDown() { + f.cancelCtx() + f.requester.Stop() } -// respondRequest reads a request from `s.sendC` and responds with `responseRequestID` as the requestID, or with -// the same ID as the request if `responseRequestID` is "". -// Before sending the response, it stores in s.responseRequestID the request ID for the requests read from `s.sendC`. -func respondRequest(ctx context.Context, t *testing.T, requestID *atomic.Value, - sendC chan *central.MsgFromSensor, - receiveC chan *central.IssueLocalScannerCertsResponse, responseOverwrite *central.IssueLocalScannerCertsResponse) { +// respondRequest reads a request from `f.sendC` and responds with `responseOverwrite` if not nil, or with +// a response with the same ID as the request otherwise. +// Before sending the response, it stores in `f.interceptedRequestID` the request ID for the requests read from `f.sendC`. +func (f *certificateRequesterFixture) respondRequest(t *testing.T, responseOverwrite *central.IssueLocalScannerCertsResponse) { select { - case <-ctx.Done(): - case request := <-sendC: - log.Warnf("respondRequest read request with id %q", request.GetIssueLocalScannerCertsRequest().GetRequestId()) // FIXME + case <-f.ctx.Done(): + case request := <-f.sendC: interceptedRequestID := request.GetIssueLocalScannerCertsRequest().GetRequestId() assert.NotEmpty(t, interceptedRequestID) var response *central.IssueLocalScannerCertsResponse @@ -139,11 +145,10 @@ func respondRequest(ctx context.Context, t *testing.T, requestID *atomic.Value, } else { response = ¢ral.IssueLocalScannerCertsResponse{RequestId: interceptedRequestID} } - requestID.Store(response.GetRequestId()) - log.Warnf("respondRequest sending response %s", response) // FIXME + f.interceptedRequestID.Store(response.GetRequestId()) select { - case <-ctx.Done(): - case receiveC <- response: + case <-f.ctx.Done(): + case f.receiveC <- response: } } } From a7f46004102acd1ebb3d6578d7dad62e30c50dbe Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 1 Feb 2022 17:09:19 +0100 Subject: [PATCH 22/29] improve comments and logs --- sensor/kubernetes/localscanner/certificate_requester.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 1473701c436cd..03481109c78ae 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -13,7 +13,7 @@ import ( var ( // ErrCertificateRequesterStopped is returned by RequestCertificates when the certificate - // requested is not initialized. + // requester is not initialized. ErrCertificateRequesterStopped = errors.New("stopped") log = logging.LoggerForModule() _ CertificateRequester = (*certificateRequesterImpl)(nil) @@ -54,6 +54,7 @@ func (r *certificateRequesterImpl) Start() { // Stop makes the certificate stop forwarding responses to running requests. Subsequent calls to RequestCertificates // will fail with ErrCertificateRequesterStopped. +// Currently active calls to RequestCertificates can be cancelled with the provided context. func (r *certificateRequesterImpl) Stop() { r.stopC.Signal() } @@ -66,7 +67,7 @@ func (r *certificateRequesterImpl) dispatchResponses() { case msg := <-r.receiveC: responseC, ok := r.requests.Load(msg.GetRequestId()) if !ok { - log.Debugf("request ID %q does not match any known request ID, skipping request", + log.Debugf("request ID %q does not match any known request ID, dropping response", msg.GetRequestId()) continue } @@ -109,7 +110,6 @@ func (r *certificateRequesterImpl) send(ctx context.Context, requestID string) e case <-ctx.Done(): return ctx.Err() case r.sendC <- msg: - log.Debugf("request to issue local Scanner certificates sent to Central successfully: %v", msg) return nil } } From 43ddb58fb60c77d620e27ec0c35578bf74594b43 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 1 Feb 2022 17:17:26 +0100 Subject: [PATCH 23/29] add test with requester never started --- .../certificate_requester_test.go | 48 +++++++++++++------ 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index 9fde7150cdb60..3dd2f9ef670c3 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -21,24 +21,40 @@ var ( ) func TestCertificateRequesterRequestFailureIfStopped(t *testing.T) { - f := newFixture(0) - defer f.tearDown() - doneErrSig := concurrency.NewErrorSignal() - - f.requester.Stop() - go func() { - certs, err := f.requester.RequestCertificates(f.ctx) - assert.Nil(t, certs) - doneErrSig.SignalWithError(err) - }() - - requestErr, ok := doneErrSig.WaitWithTimeout(testTimeout) - require.True(t, ok) - assert.Equal(t, ErrCertificateRequesterStopped, requestErr) + testCases := map[string]struct { + startRequester bool + }{ + "requester not started": {false}, + "requester stopped before request": {true}, + } + for tcName, tc := range testCases { + t.Run(tcName, func(t *testing.T) { + f := newFixture(0) + if tc.startRequester { + f.requester.Start() + } + defer f.tearDown() + doneErrSig := concurrency.NewErrorSignal() + + if tc.startRequester { + f.requester.Stop() + } + go func() { + certs, err := f.requester.RequestCertificates(f.ctx) + assert.Nil(t, certs) + doneErrSig.SignalWithError(err) + }() + + requestErr, ok := doneErrSig.WaitWithTimeout(testTimeout) + require.True(t, ok) + assert.Equal(t, ErrCertificateRequesterStopped, requestErr) + }) + } } func TestCertificateRequesterRequestCancellation(t *testing.T) { f := newFixture(0) + f.requester.Start() defer f.tearDown() doneErrSig := concurrency.NewErrorSignal() @@ -56,6 +72,7 @@ func TestCertificateRequesterRequestCancellation(t *testing.T) { func TestCertificateRequesterRequestSuccess(t *testing.T) { f := newFixture(0) + f.requester.Start() defer f.tearDown() go f.respondRequest(t, nil) @@ -67,6 +84,7 @@ func TestCertificateRequesterRequestSuccess(t *testing.T) { func TestCertificateRequesterResponsesWithUnknownIDAreIgnored(t *testing.T) { f := newFixture(100 * time.Millisecond) + f.requester.Start() defer f.tearDown() // Request with different request ID should be ignored. @@ -79,6 +97,7 @@ func TestCertificateRequesterResponsesWithUnknownIDAreIgnored(t *testing.T) { func TestCertificateRequesterRequestConcurrentRequestDoNotInterfere(t *testing.T) { f := newFixture(0) + f.requester.Start() defer f.tearDown() waitGroup := concurrency.NewWaitGroup(numConcurrentRequests) @@ -114,7 +133,6 @@ func newFixture(timeout time.Duration) *certificateRequesterFixture { timeout = testTimeout } ctx, cancel := context.WithTimeout(context.Background(), timeout) - requester.Start() return &certificateRequesterFixture{ sendC: sendC, receiveC: receiveC, From 03b0153b7fb69a430171203fe4191fd79fc45369 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 1 Feb 2022 17:22:49 +0100 Subject: [PATCH 24/29] simplify running failure tests assuming test execution CI task already timeouts as needed --- .../certificate_requester_test.go | 28 ++++--------------- 1 file changed, 6 insertions(+), 22 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index 3dd2f9ef670c3..b77da65d4d8d8 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -30,23 +30,14 @@ func TestCertificateRequesterRequestFailureIfStopped(t *testing.T) { for tcName, tc := range testCases { t.Run(tcName, func(t *testing.T) { f := newFixture(0) - if tc.startRequester { - f.requester.Start() - } defer f.tearDown() - doneErrSig := concurrency.NewErrorSignal() - if tc.startRequester { + f.requester.Start() f.requester.Stop() } - go func() { - certs, err := f.requester.RequestCertificates(f.ctx) - assert.Nil(t, certs) - doneErrSig.SignalWithError(err) - }() - - requestErr, ok := doneErrSig.WaitWithTimeout(testTimeout) - require.True(t, ok) + + certs, requestErr := f.requester.RequestCertificates(f.ctx) + assert.Nil(t, certs) assert.Equal(t, ErrCertificateRequesterStopped, requestErr) }) } @@ -56,17 +47,10 @@ func TestCertificateRequesterRequestCancellation(t *testing.T) { f := newFixture(0) f.requester.Start() defer f.tearDown() - doneErrSig := concurrency.NewErrorSignal() - go func() { - certs, err := f.requester.RequestCertificates(f.ctx) - assert.Nil(t, certs) - doneErrSig.SignalWithError(err) - }() f.cancelCtx() - - requestErr, ok := doneErrSig.WaitWithTimeout(testTimeout) - require.True(t, ok) + certs, requestErr := f.requester.RequestCertificates(f.ctx) + assert.Nil(t, certs) assert.Equal(t, context.Canceled, requestErr) } From b84558b391b2ed30f9c92a1f44fd09a5a5730a2e Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 1 Feb 2022 17:48:48 +0100 Subject: [PATCH 25/29] improve concurrent request test - check 10 concurrent requests - add jitter to simulate out of order responses --- .../localscanner/certificate_requester_test.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index b77da65d4d8d8..de40a953f7c56 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -2,6 +2,7 @@ package localscanner import ( "context" + "math/rand" "sync/atomic" "testing" "time" @@ -13,7 +14,7 @@ import ( ) const ( - numConcurrentRequests = 2 + numConcurrentRequests = 10 ) var ( @@ -59,7 +60,7 @@ func TestCertificateRequesterRequestSuccess(t *testing.T) { f.requester.Start() defer f.tearDown() - go f.respondRequest(t, nil) + go f.respondRequest(t, 0, nil) response, err := f.requester.RequestCertificates(f.ctx) assert.NoError(t, err) @@ -72,7 +73,7 @@ func TestCertificateRequesterResponsesWithUnknownIDAreIgnored(t *testing.T) { defer f.tearDown() // Request with different request ID should be ignored. - go f.respondRequest(t, ¢ral.IssueLocalScannerCertsResponse{RequestId: "UNKNOWN"}) + go f.respondRequest(t, 0, ¢ral.IssueLocalScannerCertsResponse{RequestId: "UNKNOWN"}) certs, requestErr := f.requester.RequestCertificates(f.ctx) assert.Nil(t, certs) @@ -86,7 +87,8 @@ func TestCertificateRequesterRequestConcurrentRequestDoNotInterfere(t *testing.T waitGroup := concurrency.NewWaitGroup(numConcurrentRequests) for i := 0; i < numConcurrentRequests; i++ { - go f.respondRequest(t, nil) + // use jitter to simulate out of order responses. + go f.respondRequest(t, 100*time.Millisecond, nil) go func() { defer waitGroup.Add(-1) _, err := f.requester.RequestCertificates(f.ctx) @@ -133,9 +135,10 @@ func (f *certificateRequesterFixture) tearDown() { } // respondRequest reads a request from `f.sendC` and responds with `responseOverwrite` if not nil, or with -// a response with the same ID as the request otherwise. +// a response with the same ID as the request otherwise. If `jitter` is greater than 0 then this waits for a +// random time between 0 and `jitter` before sending the response. // Before sending the response, it stores in `f.interceptedRequestID` the request ID for the requests read from `f.sendC`. -func (f *certificateRequesterFixture) respondRequest(t *testing.T, responseOverwrite *central.IssueLocalScannerCertsResponse) { +func (f *certificateRequesterFixture) respondRequest(t *testing.T, jitter time.Duration, responseOverwrite *central.IssueLocalScannerCertsResponse) { select { case <-f.ctx.Done(): case request := <-f.sendC: @@ -148,6 +151,9 @@ func (f *certificateRequesterFixture) respondRequest(t *testing.T, responseOverw response = ¢ral.IssueLocalScannerCertsResponse{RequestId: interceptedRequestID} } f.interceptedRequestID.Store(response.GetRequestId()) + if jitter > 0 { + time.Sleep(time.Duration(rand.Int63n(jitter.Milliseconds())) * time.Millisecond) + } select { case <-f.ctx.Done(): case f.receiveC <- response: From 3e7eac1782bfa8d6e715d56ba3c69f02a087de01 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Tue, 1 Feb 2022 18:04:44 +0100 Subject: [PATCH 26/29] add a deterministic response shuffling strategy --- .../certificate_requester_test.go | 58 ++++++++++++------- 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index de40a953f7c56..95ba99d5aea6f 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -81,22 +81,40 @@ func TestCertificateRequesterResponsesWithUnknownIDAreIgnored(t *testing.T) { } func TestCertificateRequesterRequestConcurrentRequestDoNotInterfere(t *testing.T) { - f := newFixture(0) - f.requester.Start() - defer f.tearDown() - waitGroup := concurrency.NewWaitGroup(numConcurrentRequests) - - for i := 0; i < numConcurrentRequests; i++ { - // use jitter to simulate out of order responses. - go f.respondRequest(t, 100*time.Millisecond, nil) - go func() { - defer waitGroup.Add(-1) - _, err := f.requester.RequestCertificates(f.ctx) - assert.NoError(t, err) - }() + testCases := map[string]struct { + randomResponseDelay bool + }{ + "decreasing response delay": {false}, + "random response delay": {true}, + } + for tcName, tc := range testCases { + t.Run(tcName, func(t *testing.T) { + f := newFixture(0) + f.requester.Start() + defer f.tearDown() + waitGroup := concurrency.NewWaitGroup(numConcurrentRequests) + + for i := 0; i < numConcurrentRequests; i++ { + i := i + var responseDelay time.Duration + if tc.randomResponseDelay { + // randomly out of order responses. + responseDelay = time.Duration(rand.Intn(100)) * time.Millisecond + } else { + // responses are responded increasingly faster, so always out of order. + responseDelay = time.Duration(numConcurrentRequests-(i+1)) * 10 * time.Millisecond + } + go f.respondRequest(t, responseDelay, nil) + go func() { + defer waitGroup.Add(-1) + _, err := f.requester.RequestCertificates(f.ctx) + assert.NoError(t, err) + }() + } + ok := concurrency.WaitWithTimeout(&waitGroup, time.Duration(numConcurrentRequests)*testTimeout) + require.True(t, ok) + }) } - ok := concurrency.WaitWithTimeout(&waitGroup, time.Duration(numConcurrentRequests)*testTimeout) - require.True(t, ok) } type certificateRequesterFixture struct { @@ -135,10 +153,10 @@ func (f *certificateRequesterFixture) tearDown() { } // respondRequest reads a request from `f.sendC` and responds with `responseOverwrite` if not nil, or with -// a response with the same ID as the request otherwise. If `jitter` is greater than 0 then this waits for a -// random time between 0 and `jitter` before sending the response. +// a response with the same ID as the request otherwise. If `responseDelay` is greater than 0 then this function +// waits for that time before sending the response. // Before sending the response, it stores in `f.interceptedRequestID` the request ID for the requests read from `f.sendC`. -func (f *certificateRequesterFixture) respondRequest(t *testing.T, jitter time.Duration, responseOverwrite *central.IssueLocalScannerCertsResponse) { +func (f *certificateRequesterFixture) respondRequest(t *testing.T, responseDelay time.Duration, responseOverwrite *central.IssueLocalScannerCertsResponse) { select { case <-f.ctx.Done(): case request := <-f.sendC: @@ -151,8 +169,8 @@ func (f *certificateRequesterFixture) respondRequest(t *testing.T, jitter time.D response = ¢ral.IssueLocalScannerCertsResponse{RequestId: interceptedRequestID} } f.interceptedRequestID.Store(response.GetRequestId()) - if jitter > 0 { - time.Sleep(time.Duration(rand.Int63n(jitter.Milliseconds())) * time.Millisecond) + if responseDelay > 0 { + time.Sleep(responseDelay) } select { case <-f.ctx.Done(): From 55d164a74f7cafabe67c103021c392d3ab724fac Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Wed, 2 Feb 2022 12:43:20 +0100 Subject: [PATCH 27/29] change test case struct for concurrent request test to make the test easier to read --- .../certificate_requester_test.go | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index 95ba99d5aea6f..e740a8dd9fdda 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -82,10 +82,16 @@ func TestCertificateRequesterResponsesWithUnknownIDAreIgnored(t *testing.T) { func TestCertificateRequesterRequestConcurrentRequestDoNotInterfere(t *testing.T) { testCases := map[string]struct { - randomResponseDelay bool + responseDelayFunc func(requestIndex int) (responseDelay time.Duration) }{ - "decreasing response delay": {false}, - "random response delay": {true}, + "decreasing response delay": {func(requestIndex int) (responseDelay time.Duration) { + // responses are responded increasingly faster, so always out of order. + return time.Duration(numConcurrentRequests-(requestIndex+1)) * 10 * time.Millisecond + }}, + "random response delay": {func(requestIndex int) (responseDelay time.Duration) { + // randomly out of order responses. + return time.Duration(rand.Intn(100)) * time.Millisecond + }}, } for tcName, tc := range testCases { t.Run(tcName, func(t *testing.T) { @@ -96,14 +102,7 @@ func TestCertificateRequesterRequestConcurrentRequestDoNotInterfere(t *testing.T for i := 0; i < numConcurrentRequests; i++ { i := i - var responseDelay time.Duration - if tc.randomResponseDelay { - // randomly out of order responses. - responseDelay = time.Duration(rand.Intn(100)) * time.Millisecond - } else { - // responses are responded increasingly faster, so always out of order. - responseDelay = time.Duration(numConcurrentRequests-(i+1)) * 10 * time.Millisecond - } + responseDelay := tc.responseDelayFunc(i) go f.respondRequest(t, responseDelay, nil) go func() { defer waitGroup.Add(-1) From 35e39a37a7d6fe6c676d143592c3e45a453d1ae7 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Wed, 2 Feb 2022 12:45:31 +0100 Subject: [PATCH 28/29] improve comment --- sensor/kubernetes/localscanner/certificate_requester.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester.go b/sensor/kubernetes/localscanner/certificate_requester.go index 03481109c78ae..4de43ad69a69e 100644 --- a/sensor/kubernetes/localscanner/certificate_requester.go +++ b/sensor/kubernetes/localscanner/certificate_requester.go @@ -54,7 +54,8 @@ func (r *certificateRequesterImpl) Start() { // Stop makes the certificate stop forwarding responses to running requests. Subsequent calls to RequestCertificates // will fail with ErrCertificateRequesterStopped. -// Currently active calls to RequestCertificates can be cancelled with the provided context. +// Currently active calls to RequestCertificates will continue running until cancelled or timed out via the +// provided context. func (r *certificateRequesterImpl) Stop() { r.stopC.Signal() } From 8a059f374b76aa5486448a44e263de2550890dbd Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Wed, 2 Feb 2022 12:53:28 +0100 Subject: [PATCH 29/29] make request delay wait cancellable --- .../kubernetes/localscanner/certificate_requester_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sensor/kubernetes/localscanner/certificate_requester_test.go b/sensor/kubernetes/localscanner/certificate_requester_test.go index e740a8dd9fdda..d69aae3374c38 100644 --- a/sensor/kubernetes/localscanner/certificate_requester_test.go +++ b/sensor/kubernetes/localscanner/certificate_requester_test.go @@ -169,7 +169,11 @@ func (f *certificateRequesterFixture) respondRequest(t *testing.T, responseDelay } f.interceptedRequestID.Store(response.GetRequestId()) if responseDelay > 0 { - time.Sleep(responseDelay) + select { + case <-f.ctx.Done(): + return + case <-time.After(responseDelay): + } } select { case <-f.ctx.Done():