ROX-12834: Introduce fake nodescan generator#3399
Conversation
|
Images are ready for the commit at 707ccf2. To use with deploy scripts, first |
|
/retest |
58159c5 to
1635ca2
Compare
|
/retest |
1 similar comment
|
/retest |
1635ca2 to
ab700bb
Compare
vikin91
left a comment
There was a problem hiding this comment.
It looks really good! I was able to run it and see it working. Leaving my comments for now and will finish the review tomorrow (I want to give it a final look).
| client, config, err := initializeStream(ctx, cli) | ||
| if err != nil { | ||
| if ctx.Err() != nil { | ||
| // continue and the <-ctx.Done() path should be taken next iteration | ||
| continue | ||
| } | ||
| log.Fatalf("error initializing stream to sensor: %v", err) | ||
| } |
There was a problem hiding this comment.
I wonder why this has been put in to the default case in the first place 🤔 Having it kept here, the code would have been called 1 or more times. Now we would call the initialization only once. Not sure whether this is a problem, but we should keep an eye on the stability of the stream.
For example: would we be able to reestablish the connection if we crash sensor while compliance is still working? That is something to check.
There was a problem hiding this comment.
Yes, the crash of sensor causes the connection to break and there is no self-healing. We need to fix that by reverting the change and merging the two loops by adding a new case to the for-switch loop.
There was a problem hiding this comment.
It seems to me, the code before the change will shut down Compliance when the connection to Sensor is lost.
Piotr, in your experiment did you check behavior of new or of unchanged code?
There was a problem hiding this comment.
I checked the behavior from after the change that I commented on. I also saw the fatal message, but I couldn't see compliance crashing after the sensor was gone. I guess we would need to check this on the original code from before this PR.
There was a problem hiding this comment.
I have reverted to the original code and also implemented a similar mechanic for the NodeScan loop.
I tested this by killing the sensor pod while compliance is running and was able to verify that both loops connect to (the newly created) sensor again.
msugakov
left a comment
There was a problem hiding this comment.
Quick and shallow check, did not go through all code.
compliance/collection/main.go
Outdated
| if err != nil { | ||
| return errors.Wrap(err, "error scanning node") | ||
| } |
There was a problem hiding this comment.
I don't think there's a need to wrap error another time (see the calling code)
| if err != nil { | |
| return errors.Wrap(err, "error scanning node") | |
| } | |
| if err != nil { | |
| return err | |
| } |
| func VerifyAndUpdateDuration(duration time.Duration) time.Duration { | ||
| if (duration) <= 0 { | ||
| log.Warn("Negative or zero duration found. Setting to 4 hours.") | ||
| return time.Hour * 4 |
There was a problem hiding this comment.
time.Hour * 4 is very specific for node rescan interval, therefore this function cannot be presented as generic "util". I think you should simply define it as GetNodeRescanInterval in pkg/env/node_scan.go. Then, instead of doing this
rescanInterval := complianceUtils.VerifyAndUpdateDuration(env.NodeRescanInterval.DurationSetting())you can simply do this
rescanInterval := env.GetNodeRescanInterval()There was a problem hiding this comment.
I don't think we need to worry about this because DurationSetting() already handles this:
// DurationSetting returns the Duration object represented by the environment variable
func (d *DurationSetting) DurationSetting() time.Duration {
val := os.Getenv(d.envVar)
if val != "" {
dur, err := time.ParseDuration(val)
if err == nil {
return dur
}
log.Warnf("%s is not a valid environment variable for %s, using default value: %s", val, d.envVar, d.defaultDuration.String())
}
return d.defaultDuration
}
That being said, in another PR, we should probably add a panic in the registerDurationSetting if the default is not valid
There was a problem hiding this comment.
ah actually time.ParseDuration does allow for negative durations, so DurationSetting() does not handle this. Perhaps we should include that in the PR above ^
There was a problem hiding this comment.
Thanks for linking the PR! We'll keep an eye on it and update our code as soon as your PR is merged.
For now, we want to get this PR merged rather quick, though.
| } | ||
| if err := runRecv(ctx, client, config); err != nil { | ||
| log.Errorf("error running recv: %v", err) | ||
| } |
There was a problem hiding this comment.
@vikin91 @Maddosaurus
Unless I read the code incorrectly, the original manageStream function does not function the way it supposed to.
I think, its idea should be to check whether the connection got closed (ctx.Done()) and in this case shut down the process (sig.Signal()), or (if the connection is still alive) receive a message from Sensor and respond back (runRecv()). The problem is that runRecv() is a blocking function that runs an infinite loop of receive-reply logic. Once the code gets to it, it will be running forever until an error happens or unsupported message arrives.
When the code is in runRecv(), the shutdown logic of manageStream won't be executed normally.
Am I right or am I missing something?
Maybe we should check other places like Sensor or Central for how shutdown-receive-send logic is organized there and copy that logic here. To me it seems that we're having difficulties plugging in scanner logic just because the existing approach in Compliance is not prepared for extensibility.
There was a problem hiding this comment.
I definitely want to look at sensor and see how it is done there.
For compliance, I see that client.Recv() is being called on a client that was created with the context client, err := cli.Communicate(ctx), so I hope that canceling the context would shutdown the client and release the lock.
(I used word "hope" as I have not verified this and do not know the internals of grpc stream).
I will do some more reading on this.
There was a problem hiding this comment.
So sensor utilizes two clever objects to manage the stream to central: a pointer *grpcUtil.LazyClientConn holding a lazy connection, and centralclient.CentralConnectionFactory that sets, monitors the connection and restarts it when failed. All communication attempts are blocking and waiting when the connection is down (no message is lost). The operations resume when the connection is back up.
We could apply this to compliance-sensor as well, but this may deserve a separate refactoring PR. Would it be okay to continue in the current shape and improve this in a separate PR @msugakov ?
There was a problem hiding this comment.
The code has changed now a lot. Issues that were before are now gone but new things still don't look 100% self-explanatory to me. I think we (I?) need to confidently understand well what we do. I would not classify "understanding" as "improvement" or "optimization". "Understanding" is our core skill.
There was a problem hiding this comment.
Yes, sorry for adding so many changes after you started the review. I think they were needed to address some obvious drawbacks regarding sending without being sure that the connection is up.
While there are still some bits about the GRPC stream that I would like to understand better, I feel pretty confident that the code handling sending and receiving in this component follows the same principles as in other parts of our code.
compliance/collection/main.go
Outdated
| continue | ||
| } | ||
| t.Reset(rescanInterval) |
There was a problem hiding this comment.
This could be an else instead of the continue
|
I wanted to share also my variant of the grpc handling (rewritten to use channels): #3548 (PR against this branch). Let's pick this PR or #3548 as a short-term solution (they both suffer from the same drawback: a new sub-client will be generated for communication every 4 hours). In the long-term, we could aim at a similar solution to that used between sensor and central. |
| ScanTime: timestamp.TimestampNow(), | ||
| Components: &scannerV1.Components{ | ||
| Namespace: "Testme OS", | ||
| OsComponents: []*scannerV1.OSComponent{ |
There was a problem hiding this comment.
Since we only care about RHCOS at this time, I'd leave this empty/nil
| Version: "2:7.4.629-6.el7.x86_64", | ||
| Arch: "x86_64", | ||
| Module: "FakeMod", | ||
| Cpes: []string{"cpe:/a:redhat:enterprise_linux:7::baseos"}, |
There was a problem hiding this comment.
Not very important, but maybe use 8 instead of 7 due to RHCOS
| RhelComponents: []*scannerV1.RHELComponent{ | ||
| { | ||
| Name: "vim-minimal", | ||
| Namespace: "rhel:7", |
| Module: "FakeMod", | ||
| Cpes: []string{"cpe:/a:redhat:enterprise_linux:7::baseos"}, | ||
| AddedBy: "FakeLayer", | ||
| Executables: []*scannerV1.Executable{ |
There was a problem hiding this comment.
let's ignore executables as well since this is for Active Vuln mgmt, and I don't think we plan on having that in the first iteration
| func VerifyAndUpdateDuration(duration time.Duration) time.Duration { | ||
| if (duration) <= 0 { | ||
| log.Warn("Negative or zero duration found. Setting to 4 hours.") | ||
| return time.Hour * 4 |
There was a problem hiding this comment.
I don't think we need to worry about this because DurationSetting() already handles this:
// DurationSetting returns the Duration object represented by the environment variable
func (d *DurationSetting) DurationSetting() time.Duration {
val := os.Getenv(d.envVar)
if val != "" {
dur, err := time.ParseDuration(val)
if err == nil {
return dur
}
log.Warnf("%s is not a valid environment variable for %s, using default value: %s", val, d.envVar, d.defaultDuration.String())
}
return d.defaultDuration
}
That being said, in another PR, we should probably add a panic in the registerDurationSetting if the default is not valid
| func VerifyAndUpdateDuration(duration time.Duration) time.Duration { | ||
| if (duration) <= 0 { | ||
| log.Warn("Negative or zero duration found. Setting to 4 hours.") | ||
| return time.Hour * 4 |
compliance/collection/main.go
Outdated
| case <-ctx.Done(): | ||
| return | ||
| case <-t.C: | ||
| client, _, err := initializeStream(ctx, cli) |
There was a problem hiding this comment.
I wonder if we should throw this inside of manageStream so we can just reuse the same stream and use the same stop signals
There was a problem hiding this comment.
We tried this and this is not trivial, because the runRecv blocks. I have a variation on this here #3548 but it also uses two loops - one for sending and one for receiving.
- Refactor: Rename package to nodescanv2 - Fix style remarks
Co-authored-by: Piotr Rygielski <114479+vikin91@users.noreply.github.com>
Co-authored-by: Piotr Rygielski <114479+vikin91@users.noreply.github.com>
- rename manageNodeScanLoop - move scanner injection to function call - use real timestamp in fake node scanner message generation
- Extend fake message with more components - Remove commented out code
Co-authored-by: Misha Sugakov <537715+msugakov@users.noreply.github.com>
b10db84 to
707ccf2
Compare
|
I have rebased the PR for a final time to benefit from the latest CI flake fixes. |
|
@Maddosaurus shall we have another CI run with the new ci-labels that Misha added? |
|
/test all |
| client, _, err := initializeStream(ctx, cli) | ||
| if err != nil && ctx.Err() == nil { | ||
| // error even after retries | ||
| log.Fatalf("unable to establish send stream to sensor: %v", err) | ||
| } | ||
| if err := client.Send(sc); err != nil { | ||
| log.Errorf("failed sending nodeScanV2 to sensor: %v", err) | ||
| } |
There was a problem hiding this comment.
I read up about gRPC yesterday and got a bit more dangerous ;-) although still not very educated.
And I'm definitely here to spoil the party that started (even though I haven't used the opportunity to do that earlier). It seems this code will open a stream each time there's something to send. I think gRPC maintains a pool of connections on the client, plus streams get multiplexed over HTTP/2 (but only when that's used), so opening a stream doesn't mean establishing new connection, but I think it still has some overhead.
What is worse, I believe that we reuse the same rpc Communicate as the ordinary Compliance does. An effect of that might be that Sensor will push its messages over that stream too, but on Compliance side there's nothing receiving and acting on them for this stream. In case Sensor pushes a message to perform Compliance checks (message TriggerRun) through this stream, it will get lost.
From looking at Sensor code, it seems to me that more recent connection (actually stream) replaces previous in the map keyed by a node name. Therefore, I strongly suspect that what I described actually happens if the feature flag for node scans is turned on, i.e. compliance checks stop working properly.
I don't think it's an issue for now as the feature is gated but I think the logic should be reworked before we are ready to release.
WDYT?
There was a problem hiding this comment.
That is a very good find. We should definitely improve this before releasing - it deserves a ticket.
Applying the connection factory with shared pointer to the stream (as seen in Sesor-Central) should fix the problem that you described, but I am not saying that this is the only viable solution to it. I will open a ticket.
There was a problem hiding this comment.
A thought about this:
In case Sensor pushes a message to perform Compliance checks (message TriggerRun) through this stream, it will get lost.
we may close the stream immediately after sending the scan results to notify sensor that this connection should not be used anymore.
There was a problem hiding this comment.
we may close the stream immediately after sending the scan results to notify sensor that this connection should not be used anymore.
I'm afraid the older active connection established by regular compliance logic may not get restored in Sensor's connection map. It deserves some investigation.
I think the problem is that currently Sensor first calls Compliance to do something (after the initial handshake) whereas in our new flow Compliance will first call Sensor. I don't think gRPC bidirectional streams allow receiving and sending at the same time (i.e. full duplex). As a peer in the stream you can either receive or send (or do nothing) but not receive and send concurrently. Therefore the existing stream can only be reused if Sensor initiates scans (going back to one of your proposals Piotr). We will need a different rpc if otherwise we want Compliance to initiate sending scan results (and Sensor ack or nack it).
This may be confusing to read, probably more efficient would be to explain with showing what I mean in code.
I think, your idea about Sensors initiating scans deserves to be reconsidered.
There was a problem hiding this comment.
As a peer in the stream you can either receive or send (or do nothing) but not receive and send concurrently.
This would be a limitation. I got the impression from Sensor<->Central that the sending and receiving works concurrently, but I didn't confirm this experimentally.
Having a separate rpc in the proto file would be also an option (maybe even a much cleaner one), but I do not think that this is necessary. The way we initiate the stream clients gives us possibility to send without depending on whether something was received (it is not in the final PR, but we had this variant shortly in the branch).
I am thinking how to design a manual test to confirm or deny our theories 🤔
There was a problem hiding this comment.
Created https://issues.redhat.com/browse/ROX-13383 to not forget about the issue we are discussing here.
There was a problem hiding this comment.
It turns I was wrong about no duplex support. Looking at Sensor code, one stream is used for both sending and receiving concurrently.
stackrox/sensor/common/sensor/central_communication_impl.go
Lines 157 to 158 in adcf5c3
There was a problem hiding this comment.
In case Sensor pushes a message to perform Compliance checks (message TriggerRun) through this stream, it will get lost. (@msugakov)
A single manual test is nothing to go by, but I was able to successfully run a compliance check with the FF turned on, so it might be the message isn't lost necessarily.
But we should definitely investigate this further! Thanks for creating the ticket @vikin91
Description
This PR introduces a generator to send fake NodeScanV2 messages from Collector to Sensor.
These messages enable us to build out the full pipeline independently from implementing the NodeScanV2 logic.
ROX_RHCOS_NODE_SCANNING=truewhen callingdeploy.shto enable it.ROX_NODE_SCAN_INTERVALthat defaults to 4 hoursChecklist
Documented user facing changes (create PR based on openshift/openshift-docs and merge into rhacs-docs)If any of these don't apply, please comment below.
Testing Performed
Deploy (e.g. on local) with enabled feature flag
ROX_RHCOS_NODE_SCANNING=true ./deploy/k8s/deploy-local.shObserve the log message in the compliance container (in the compliance pod) that the scanning interval is 4 hours
Observe a fake Node Scan message being logged in Sensor logs once at start, then every 4 hours
4. Edit Collector DaemonSet and add a custom interval as env var to the collector imageSet environment variable via
kubectl:kubectl -n stackrox set env daemonsets/collector --containers="compliance" ROX_NODE_RESCAN_INTERVAL="20s"Observe the Collector Pod getting reinstantiated and the collector image log message stating an interval of 15 seconds.
Finally,
teardowneverything and redeploy withdeploy.shwithout setting the feature flag.Observe the collector log not stating a Node Scan Interval.
Additional testing: Resilience
We also performed error cases: