ROX-12835: Add support for NodeScanV2 to Sensor#3533
Conversation
|
Skipping CI for Draft Pull Request. |
|
/test all |
|
Images are ready for the commit at 7bbcff1. To use with deploy scripts, first |
0844c2d to
f99b0a8
Compare
|
/test all |
1 similar comment
|
/test all |
Maddosaurus
left a comment
There was a problem hiding this comment.
I've discussed this code with Piotr in a call already, so I am happy to approve. These are a nitpick and a question that I had re-reading the code
| "github.com/stackrox/rox/sensor/common" | ||
| ) | ||
|
|
||
| // NodeScanHandler .... |
There was a problem hiding this comment.
Should we maybe have a more detailed description here?
Something like "NodeScanHandler keeps track of all arriving NodeScanV2 messages and sends them to central for processing"
sensor/kubernetes/sensor/sensor.go
Outdated
| auditLogEventsInput := make(chan *sensorInternal.AuditEvents) | ||
| auditLogCollectionManager := compliance.NewAuditLogCollectionManager() | ||
|
|
||
| indicators := make(chan *central.MsgFromSensor) |
There was a problem hiding this comment.
Out of curiosity: Is there a reason you moved the var up here?
There was a problem hiding this comment.
I think I had a variant of a solution that was using this channel. But this is a leftover from it and it can be moved back to where it was.
|
/test gke-nongroovy-e2e-tests |
12787b3 to
d7bf5ac
Compare
️✅ There are no secrets present in this pull request anymore.If these secrets were true positive and are still valid, we highly recommend you to revoke them. 🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request. |
|
Current dependencies on/for this PR:
This comment was auto-generated by Graphite. |
| case <-c.stopC.Done(): | ||
| c.stoppedC.SignalWithError(c.stopC.Err()) | ||
|
|
There was a problem hiding this comment.
I think I understand that the intent of this code is to arrange a graceful shutdown.
c.stopC is when the shutdown was requested. c.stoppedC is when the handler actually shut down.
I understand that you likely copied the pattern found in command_handler_impl.go, but I think the pattern is wrong.
This specific fragment will just make for loop repeat quickly eating up CPU until c.nodeScans channel is closed.
I believe the implementation should be fixed here and propagated to other handlers.
I find https://github.com/stackrox/stackrox/blob/master/sensor/common/sensor/central_sender_impl.go to be more clean. Hopefully it can serve as a reference implementation.
A unit test for this handler will be very handy to demonstrate and validate its behavior.
There was a problem hiding this comment.
I find https://github.com/stackrox/stackrox/blob/master/sensor/common/sensor/central_sender_impl.go to be more clean. Hopefully it can serve as a reference implementation.
I would love to follow up on this, as I see there several issues that makes the concurrent code difficult to follow. These are not no-goes, but some patterns that I learnt are violated. Examples:
forwardResponseswrites to a channeltothat is passed as an external parameter and never closes the channel to signal that there will no more messages.componentMsgsCis never closed, so why to have the code that handles the situation when the channel is closed?forwardResponsesstops working as soon asfor !s.stopC.IsDone() {- this is the same situation as here, we drop messages on the floor instead of forwarding them, as we do not have any guarantee that the stream will still be up.
I am not saying that the code is not working, but I have several rules that I follow when working with concurrency:
- the goroutine that writes to a channel is responsible for creating the channel and closing it,
- only the writer knows that there will be no more messages sent to a channel. If that happens, the channel should be closed to inform all readers that there will be no more messages,
- there is exactly one writer goroutine for a channel, but there may be many readers.
Having said that, I am curious to learn why do you think that the code sensor/common/sensor/central_sender_impl.go is cleaner.
This specific fragment will just make for loop repeat quickly eating up CPU until c.nodeScans channel is closed.
Yes, this is an issue and should be addressed. I guess adding return should do the job, but I will check that.
A unit test for this handler will be very handy to demonstrate and validate its behavior.
Agreed! Will try to produce some tests for this.
There was a problem hiding this comment.
Your notes about channels are likely right. I did not check how channels are handled but I definitely saw that some of them aren't closed.
I brought central_sender_impl.go because it more accurately deals with c.stopC and c.stoppedC than this code. Particularly, there should be only one place that sets c.stoppedC - the deferred hook. Everything else like error conditions or closed channels should just initiate the shut down by setting c.stopC. Signalling that the shutdown completed (which c.stoppedC is supposed to do) in the middle of the function seems wrong to me.
sensor/common/compliance/service.go
Outdated
|
|
||
| Output() chan *compliance.ComplianceReturn | ||
| AuditEvents() chan *sensor.AuditEvents | ||
| NodeScansV2() <-chan *storage.NodeScanV2 |
There was a problem hiding this comment.
Maybe just NodeScans?
| NodeScansV2() <-chan *storage.NodeScanV2 | |
| NodeScans() <-chan *storage.NodeScanV2 |
There was a problem hiding this comment.
In regards to V2: You're absolutely right about V1 not being a real NodeScan (src), but as the existing feature is called NodeScan in every piece of currently used code, it might be very confusing to introduce a completely different thing under the same name, right?
There was a problem hiding this comment.
I will make a proposal to clear up the naming and their meaning, as I see that it makes even more problems when navigating though the code of Central and Scanner.
There was a problem hiding this comment.
I don't mind keeping V2. In this case I'd vote to have V2 more consistently. There are some spots in this PR where it could be (re-)added.
There was a problem hiding this comment.
Follow-up to the naming discussion: #3755
msugakov
left a comment
There was a problem hiding this comment.
Nothing blocking the PR, mostly nitpicks in attempt to understand the existing design and how what you introduce fits in it. I'm ok if this change gets merged as-is and you address my comments in a follow-up.
|
|
||
| // NodeScanHandler is responsible for handling the arriving NodeScanV2 messages, processing then, and sending them to central | ||
| type NodeScanHandler interface { | ||
| Stopped() concurrency.ReadOnlyErrorSignal |
There was a problem hiding this comment.
Nitpicking, but there's nothing besides the test that calls Stopped() function. Why have it at all?
| } | ||
|
|
||
| func (c *nodeScanHandlerImpl) run() { | ||
| defer c.stoppedC.SignalWithError(c.stopC.Err()) |
There was a problem hiding this comment.
It seems to me, that Start() can only be called once. At the moment when run() completes either due to Stop() or due to error, c.stopC will be set. Any subsequent attempts to Start() will not fail (maybe they should?) and will lead to run() quickly exiting.
In such situation, I think it makes sense to defer close c.toCentral so that we properly close channel and eliminate one forwarding goroutine (see central_sender_impl.go).
Please take into account that select/case will choose a random branch if multiple are satisfied, therefore it makes sense to additionally guard for { loop with a check. E.g. for !c.stopC.IsDone() {.
There was a problem hiding this comment.
Got it covered!
I am only not fully sure which goroutine is meant with "eliminate one forwarding goroutine" 🤔
| // give central a chance to receive something before we lock s.mu | ||
| <-time.After(50 * time.Millisecond) |
There was a problem hiding this comment.
This seems brittle. What if after a bad refactoring (in the future) you give it more time and it receives all 1000 messages? What if 50 milliseconds are not enough to receive even 1 message on some systems?
I have no concrete ideas for improvement but it feels the test tries to do too much.
There was a problem hiding this comment.
Correct. I fixed that in a bit less flaky (but not ideal) way.
|
|
||
| // TestStopHandler goal is to stop handler while there are still some messages to process | ||
| // in the channel passed into NewNodeScanHandler. | ||
| // We expect that premature stop of the handler produces no race condition or goroutine leak. |
There was a problem hiding this comment.
How come no goroutine leaks if the one started by nodeScansProducerThatStops.start() should hang and remain blocked after sending the first few items over its channel?
There was a problem hiding this comment.
Correct, it leaked. This should be fixed now (checked with https://github.com/uber-go/goleak, which I cannot include as the test logger used here leaks one goroutine)
There was a problem hiding this comment.
I played more with the leaks and the goleak from Uber. I was able to eliminate all leaks in the code (test & feat). I hope it is okay to add https://github.com/uber-go/goleak as a new dependency.
|
Rebasing against master |
a070b11 to
67855c8
Compare
b2dc987dd4 WIP: Handle NodeScanV2 in sensor 39923562b4 Add new Sensor Event f8d7bf497c Implement Sensor->Central comm for NodeScanV2 a17da26349 Properly add sensor component 19b9996569 Remove debug logging, unbuffer channel
67855c8 to
7c3634a
Compare
|
/retest |
|
/test gke-postgres-nongroovy-e2e-tests |
| package metrics | ||
|
|
||
| // Resource represents the resource that we want to time. | ||
| // |
There was a problem hiding this comment.
I think this is a go1.19 thing. Since we are updating the file anyway, might as well keep it, but I have also had the displeasure of dealing with this when trying to style this repo
There was a problem hiding this comment.
Yeah, I saw that and reverted to go 1.18
| s.auditEvents <- t.AuditEvents | ||
| s.auditLogCollectionManager.AuditMessagesChan() <- msg | ||
| case *sensor.MsgFromCompliance_NodeScanV2: | ||
| log.Infof("NodeScanV2 message received: %v", msg) |
There was a problem hiding this comment.
Up to you, but want to keep this as a Debug log if that'll help development?
There was a problem hiding this comment.
We will work more on the sensor corner, so there will be a chance to add some more meaningful debugging
|
|
||
| Output() chan *compliance.ComplianceReturn | ||
| AuditEvents() chan *sensor.AuditEvents | ||
| NodeScans() <-chan *storage.NodeScanV2 |
There was a problem hiding this comment.
curious, do we ever send to the other channels? why is this the only receive-only channel?
There was a problem hiding this comment.
I am following a pattern where I explicitly identify writers and readers by using general channel in the first case and the read-only in the second case. Here, this helps us to discover early potential errors when the reader would try to write to a channel.
| func assertNoGoroutineLeaks(t *testing.T) { | ||
| goleak.VerifyNone(t, | ||
| // Ignore a known leak: https://github.com/DataDog/dd-trace-go/issues/1469 | ||
| goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), |
There was a problem hiding this comment.
uh oh. is this a bad leak? I see it's unresolved, but I wonder how much this affects us
There was a problem hiding this comment.
I didn't dig deeper, but there is some info on the link added. I think the leak is in a dependency of a dependency of a dependency and it may be tricky to get rid of.
| defer close(nodeScans) | ||
| h := NewNodeScanHandler(nodeScans) | ||
|
|
||
| assert.NoError(s.T(), h.Start()) |
There was a problem hiding this comment.
since we have a suite, maybe we should use s.Assert().NoError(h.Start())? Doesn't really matter tbh, though
There was a problem hiding this comment.
Oh, sorry, you are right, this would look nicer. I spotted your review first after merging, but I will apply the suggestions in the next PR in the stack (#3755) or in a separate PR.
| // in the channel passed into NewNodeScanHandler. | ||
| // We expect that premature stop of the handler produces no race condition or goroutine leak. | ||
| // Exec with: go test -race -count=1 -v -run ^TestNodeScanHandler$ ./sensor/common/compliance | ||
| func (s *NodeScanHandlerTestSuite) TestStopHandler() { |
There was a problem hiding this comment.
Good question. I need to check whether this is included in the race tests.
| } | ||
|
|
||
| func (c *nodeScanHandlerImpl) ProcessMessage(_ *central.MsgToSensor) error { | ||
| // This component doesn't actually process or handle any messages sent from Central to Sensor (yet). |
There was a problem hiding this comment.
Does this imply there is a plan to add this (I'm imagining Central requesting a scan periodically as part of reprocessing or user forcing it)?
There was a problem hiding this comment.
This is added to implement the handler interface. It does not meant that we will support anything arriving from central, but we do have such possibility of we want to :)
| return c.toCentral | ||
| } | ||
|
|
||
| func (c *nodeScanHandlerImpl) Start() error { |
There was a problem hiding this comment.
Do we want to allow multiple Start calls to happen?
For example, this seems possible:
Start()
Start()
Start()
...
Stop()
There was a problem hiding this comment.
I investigated and this will cause a panic due to closing a channel twice (only when the feature-flag is enabled). I will provide a fix and a test for this.
| } | ||
|
|
||
| func (c *nodeScanHandlerImpl) sendScan(scan *storage.NodeScanV2) { | ||
| if scan != nil { |
There was a problem hiding this comment.
up to you, but maybe we can do something like the following to remove all the indentation:
if scan == nil {
return
}
...
| }, | ||
| }, | ||
| }: | ||
| return |
There was a problem hiding this comment.
do we need the return here and for the other case?
There was a problem hiding this comment.
No, no need for them. I will remove them.
| toCentral chan *central.MsgFromSensor | ||
|
|
||
| stopC concurrency.ErrorSignal | ||
| stoppedC concurrency.ErrorSignal |
There was a problem hiding this comment.
This seems to be a leftover.
7ffc6be ROX-13368: Skip failing nongroovy tests on PG (#3721) bbdd7a0 Bump github.com/gofrs/uuid from 4.3.0+incompatible to 4.3.1+incompatible (#3642) 1f253f2 Bump github.com/google/certificate-transparency-go from 1.1.3 to 1.1.4 (#3543) d434c8d [ROX-13030] : Add delete collection API endpoint and service implementation (#3648) f062c21 Dashrews/ROX-13253 wait for central-db to come back after bounce and allow FATAL connection lost error (#3537) edc1174 CI: Fill the gaps for https://testgrid.k8s.io/ (#3715) 86d7c54 ROX-13231: use passed context when non-postgres (#3540) 9093195 Add less specific type for BE collection response string (#3728) 5abb652 Only enable ROX_OBJECT_COLLECTIONS feature flag during gke-postgres-ui-e2e job (#3727) 4f64cd1 Add centralDBOnly mode in render (#3707) d67bbe5 Dashrews/ROX-13082 UUID searcher and common updates to set allow use of postgres UUID PR 1 of 4 (#3679) 6f829d5 ROX-13259: graphInit called during init time (#3705) f3bc50d ROX-13380: Conditional rendering edges for deployments and namespaces (#3641) 3764476 ROX-12319: implement smoke test step with groovy test filter (#3220) f202fd4 ROX-11826: Disable kernel support package uploads for managed central (#3661) 61f03dc ROX-11101: Remove deprecated resources from central (#3115) e6aa6d7 ROX-11101: Restore Role permission in UI (#3428) 3203e04 ROX-11101: Remove deprecated resources (#3036) a35f41e Bump golang.org/x/sys from 0.1.0 to 0.2.0 (#3733) 4120524 Bump snakeyaml from 1.29 to 1.33 in /qa-tests-backend (#3732) e1785c0 Bump github.com/coreos/go-systemd/v22 from 22.4.0 to 22.5.0 (#3724) 870df4a Bump google.golang.org/api from 0.101.0 to 0.102.0 (#3723) 721454c Generalize User-Agent setup (#3672) 6a11bf0 Bumps collector version to 3.11.x-145-gc345f72f5e (#3736) ec5d343 [ROX-12923] Walk retries - remainder work (#3729) 40f3d43 ROX-13440: Replace ambiguous central with sensor in networkGraph integration test (#3730) 281ed22 Bump groovy-xml from 2.5.18 to 2.5.19 in /qa-tests-backend (#3741) 49d1651 Bump github.com/prometheus/client_golang from 1.13.1 to 1.14.0 (#3742) b5544aa Bump cloud.google.com/go/storage from 1.27.0 to 1.28.0 (#3743) 9c61e53 ensure CVSS is present for istio vulns (#3706) ae29d52 ROX-13452: don't always clobber scoped ctx when non-postgres (#3748) 517bf05 ROX-13261: DryRunUpdate on collection datastore (#3687) baf7654 ROX-13378: Group new resources with deprecated in UI (#3690) 569922f ROX-13421: Enable roxctl netpol generate and add tech-preview messages (#3740) 2465fc5 Dashrews/ROX-13082 UUID generator templates PR 2 of 4 (#3681) c093c68 Bump slack-api-client from 1.20.2 to 1.27.0 in /qa-tests-backend (#3752) 2c860bb Bump ubi8-micro from 8.6 to 8.7 in /operator (#3751) 80eb04c Make deploy.sh and deploy-local.sh pass shellcheck (#3582) 2182b43 Dashrews/ROX-13082 UUID test updates PR 3 of 4 (#3694) 6dc6ca5 [ROX-13403] : Fix node -> topVuln sub resolver bug when node cves is empty (#3689) 1b21361 Move integration tests for page title from general to specific containers (#3675) e1a9f31 Bump google.golang.org/api from 0.102.0 to 0.103.0 (#3773) a05ea31 Bump golang.org/x/crypto from 0.1.0 to 0.2.0 (#3772) 65ddf4f ROX-12824: Add roxctl commands to generate Central DB bundle (#3602) c3f1e2f Remove obsolete authProviders request for Integrations page (#3759) 7ccd54d Dashrews/ROX-13082 UUID protos generated PR 4 of 4 (#3698) 9ab5c8f cleanup image digest utilities (#3764) 187ed44 ROX-11931: Convert junit failure artifacts to Slack attachments (#3438) b5d8790 ROX-13432: leaning up unused code copied/pasted from topology demo (#3750) ab05bfc Refactor collection form page for better composition (#3744) c5562f7 Remove babel devDependencies in ui-components (#3761) 2b90b3a Extract collection form from drawer wrapper layout (#3745) a779fc9 [ROX-12625 + ROX-13032] : Add GetCollectionCount and UpdateCollection endpoints and services (#3749) e77f0da Upgrade cypress 11.0.0 devDependencies in ui (#3760) a3fba94 ROX-13068: Use real data for deployment details (#3688) 4c7d90e ROX-12617: Collection to search query converter (#3683) 3e98aec ROX-13067: fill out port configurations section of deployment details (#3714) a48de36 ROX-12835: Add support for NodeScanV2 to Sensor (#3533) 30c5dc7 ROX-13466: Fix deletion of groups with empty properties (#3756) 5cb2470 Add autocomplete for name selector dropdowns (#3676) b9a75ad ROX-13464 adding flows dropdown in NG (#3763) 3217a67 [ROX-13500] Perform type check for V1 CronJob (#3787) af3790d Remove bulk delete from collections table (#3776) dda123b Add more info in migration log (#3788) 179f0c9 ROX-13502: Remove the circular dependency between cluster datastore init and cscc notifier init (#3790) 029d584 Update SCANNER_VERSION (#3774) cbca57c Bump github.com/ckaznocha/protoc-gen-lint from 0.2.4 to 0.3.0 (#3783) 3613b56 Bump golang.org/x/tools from 0.2.0 to 0.3.0 (#3782) 5fc0a6a Bump github.com/google/go-containerregistry from 0.12.0 to 0.12.1 (#3781) 1d1c687 Bump controller-gen version to 0.10.0 (#3754) c3a5290 Untie documentation link from the product version (#3799) ed822aa use correct package for migration (#3784) 397a0b4 Validate that label keys are valid k8s labels and ensure correct key splitting (#3777) edd1050 Rename variable ScannerGRPCEndpoint to ScannerSlimGRPCEndpoint (#3657) 6662c9f ROX-13378: Access Control page permissions (#3720) b0e73c5 fix Operator reconciliation for external Central DB (#3796) b83bc1f ROX-13505: Fix error log scanning the postgres stat collection (#3795) ca660cb Prevent the collection being edited from displaying in its own embedded list (#3778) 3f7b3fc [ROX-13441][POSTGRES] Propagate context correctly in retries (#3793) e0cbc6f ROX-12839: Update changelog to announce removal of in-product docs (#3805) 696e8bc [ROX-12358] Follow up on vulnerability request proto change (#2851) c4b46d8 Change getCollectionCount endpoint and updateCollection request type 5f2efbc remove make proto-fmt (#3804) 0c75540 Remove os.Std* from roxctl/central (#3758) 25a90de Add ability to view embedded collections in a pop up modal (#3747) 5c1bf81 ROX-13240: fix scanner-slim updates when WebSockets are used (#3704) 1d98577 Add more context to jira notifier logging (#3812) da2fd28 ROX-13031: DryRun Collection API (#3766) 1c418d5 Test data migration code in postgres tests (#3803) ed95b37 Update UI Collection requests for BE compatibiltiy (#3762) 09cc188 ROX-11931: Fix junit-parse install in CI (#3811) d2b01e3 ROX-12814: Disable PolicyFieldsTest on openshift. (#3797) d10ce27 ROX-13345: disable 'missing required registry' aspect on openshift (#3798) 3d22396 Update collector to 3.12 (#3809) 1eb33fb ROX-13347: Modify scope queries to included quoted cluster and nameace names, to allow exact matches instead of erroneous and unintended prefix matches. (#3767) 3811a69 ROX-12621: list collection selectors api (#3806) f6d3f9d Add migration for groups with invalid values (#3789) cc21125 Bugfixes for collection autocomplete (#3816) 7623dec ROX-9350 Use fine-grained host paths for compliance mounts (#2479) b4bf5c2 Fix collector volumeMounts (#3826) 0e9be05 ROX-12953: figure out last 4 versions of sensor automatically (#3611) 459c7ae ROX-12814: Add proper todo for reenabling the test (#3817) 9ee40ff ROX-13523: add isEnabled enum to central db spec (#3815) 535bc72 Replace requestConfig with routeMatcherMap in helper functions for integration tests (#3686) 2b75b61 `gosec` G104: Add `ShouldErr(err)` that returns `err` (#3830) fb1b82f WIP: Introduce nodescan call 35f8a8f WIP: Prepare converter 716144b Moved and renamed fake nodescan tests 4748de7 Introduce real node scanner with conversion functions 5e6d9a8 wip: real scanner 0169868 wip: log results 1438d2c wip: Debug Analyze call b09894c wip: Debug Analyze call 3ceed72 wip: Update and improve debug logs d1669fd Remove copied lib, bump scanner version, add debug 14a3f73 Merge branch 'master' into mm/ROX-12967-real-nodescan fbd0450 Fix style issues 17ccb31 Debug: let both scans finish to see what they return

Description
This PR enriches sensor with support to handling the NodeScanV2 messages.
In the current scope, the messages should be forwarded to Central without any manipulation (passthrough).
This PR also adds an empty Central pipeline that receives the NodeScanV2 messages from sensor and prints a log line that the message was received.
All these parts should work only when the feature-flag
ROX_RHCOS_NODE_SCANNING=trueis enabled.Checklist
[ ] Evaluated and added CHANGELOG entry if required[ ] Determined and documented upgrade steps[ ] Documented user facing changes (create PR based on openshift/openshift-docs and merge into rhacs-docs)Testing Performed