ROX-29755: Recover reprocessing when Sensor's are 'stuck'#15802
ROX-29755: Recover reprocessing when Sensor's are 'stuck'#15802
Conversation
|
Skipping CI for Draft Pull Request. |
|
Images are ready for the commit at 7372f15. To use with deploy scripts, first |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #15802 +/- ##
==========================================
- Coverage 48.80% 48.79% -0.02%
==========================================
Files 2590 2593 +3
Lines 190593 190797 +204
==========================================
+ Hits 93023 93098 +75
- Misses 90259 90390 +131
+ Partials 7311 7309 -2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Will trigger a panic in non-release builds when combined with logging.ImageID, logging.ImageName, etc.
49c8073 to
620fde5
Compare
|
/test all |
|
/test all |
There was a problem hiding this comment.
Hey @dcaravel - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `central/reprocessor/reprocessor.go:486` </location>
<code_context>
+// injectMessage will inject a message onto connection, an error will be returned if the
+// injection fails for any reason, including timeout.
+func (l *loopImpl) injectMessage(ctx context.Context, conn connection.SensorConnection, msg *central.MsgToSensor) error {
+ if l.injectMessageTimeoutDur > 0 {
+ var cancel context.CancelFunc
</code_context>
<issue_to_address>
Consider using the passed context for timeout only if it is not already deadline-bound.
Wrapping a context with an existing deadline in another timeout may cause issues. Check for an existing deadline before applying injectMessageTimeoutDur.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
if l.injectMessageTimeoutDur > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, l.injectMessageTimeoutDur)
defer cancel()
}
=======
if l.injectMessageTimeoutDur > 0 {
if _, hasDeadline := ctx.Deadline(); !hasDeadline {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, l.injectMessageTimeoutDur)
defer cancel()
}
}
>>>>>>> REPLACE
</suggested_fix>
### Comment 2
<location> `central/reprocessor/reprocessor_unit_test.go:346` </location>
<code_context>
+ testLoop.reprocessImagesAndResyncDeployments(0, reprocessFuncUpdate, nil)
+ })
+
+ t.Run("skip some messages when are broken clusters", func(t *testing.T) {
+ testLoop, connA, connB := newReprocessorLoop(t)
+
+ // Cluster "a" is healthy, expect all applicable images to be sent.
+ connA.EXPECT().InjectMessage(gomock.Any(), updatedImageTypeCond).Times(len(imgs) / 2).Return(nil)
+ // Cluster "b" is not healthy, expect at MOST imageReprocessorSemaphoreSize attempted messages.
+ connB.EXPECT().InjectMessage(gomock.Any(), updatedImageTypeCond).MaxTimes(int(imageReprocessorSemaphoreSize)).Return(errors.New("broken"))
+
+ connA.EXPECT().InjectMessage(gomock.Any(), reprocessDeploymentsTypeCond).Times(1).Return(nil)
+ // No reprocess deployments message is sent due to previous failures.
+ connB.EXPECT().InjectMessage(gomock.Any(), reprocessDeploymentsTypeCond).Times(0).Return(nil)
+
+ testLoop.reprocessImagesAndResyncDeployments(0, reprocessFuncUpdate, nil)
+ })
+}
</code_context>
<issue_to_address>
Consider asserting the number of skipped messages for broken clusters.
Additionally, please assert that the skip counter metric is incremented correctly to validate the metric logic.
Suggested implementation:
```golang
t.Run("skip some messages when are broken clusters", func(t *testing.T) {
testLoop, connA, connB := newReprocessorLoop(t)
// Cluster "a" is healthy, expect all applicable images to be sent.
connA.EXPECT().InjectMessage(gomock.Any(), updatedImageTypeCond).Times(len(imgs) / 2).Return(nil)
// Cluster "b" is not healthy, expect at MOST imageReprocessorSemaphoreSize attempted messages.
connB.EXPECT().InjectMessage(gomock.Any(), updatedImageTypeCond).MaxTimes(int(imageReprocessorSemaphoreSize)).Return(errors.New("broken"))
connA.EXPECT().InjectMessage(gomock.Any(), reprocessDeploymentsTypeCond).Times(1).Return(nil)
// No reprocess deployments message is sent due to previous failures.
connB.EXPECT().InjectMessage(gomock.Any(), reprocessDeploymentsTypeCond).Times(0).Return(nil)
// Capture skip metric before
skippedBefore := imageReprocessorSkippedCounter.Value()
testLoop.reprocessImagesAndResyncDeployments(0, reprocessFuncUpdate, nil)
// Calculate expected skipped messages for broken cluster "b"
expectedSkipped := (len(imgs) / 2) - int(imageReprocessorSemaphoreSize)
if expectedSkipped < 0 {
expectedSkipped = 0
}
// Assert skip metric incremented correctly
skippedAfter := imageReprocessorSkippedCounter.Value()
if int(skippedAfter-skippedBefore) != expectedSkipped {
t.Errorf("expected %d skipped messages, got %d", expectedSkipped, int(skippedAfter-skippedBefore))
}
})
```
- If `imageReprocessorSkippedCounter.Value()` is not the correct way to access the metric, replace it with the appropriate getter for your metrics library.
- If the skip counter is not global, you may need to access it via `testLoop` or another struct.
- If the metric is not implemented, you will need to add it to the production code and increment it when a message is skipped.
</issue_to_address>
### Comment 3
<location> `central/reprocessor/reprocessor_unit_test.go` </location>
<code_context>
+ t.Run("use timeout when duration set", func(t *testing.T) {
</code_context>
<issue_to_address>
Consider adding a test for injectMessage error propagation.
Add a test case where InjectMessage returns an error to verify proper error wrapping and propagation.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
janisz
left a comment
There was a problem hiding this comment.
I like the idea but I'm not sure if this is the right place to add a circuit breaker as this will only fix image reporocessing. We need to rethink sensor<->central communication and health checking.
lvalerom
left a comment
There was a problem hiding this comment.
The changes look good. I also agree we should make the central <-> sensor communication more robust to these kind of things.
[nit] I was a little bit confused at the beginning as to why we were incrementing the skip_count when there was an error. Should we separate the skip_count metric into two to not mix messages that failed to be delivered (due to an error) and messages that were deliberately skipped (central does not attempt to send anything due to previous errors)? Or maybe rename it?
I'll separate out the metrics (may use an additional label). |
|
/test gke-operator-e2e-tests |
|
The backport to To backport manually, run these commands in your terminal: # Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-release-4.6 release-4.6
# Navigate to the new working tree
cd .worktrees/backport-release-4.6
# Create a new branch
git switch --create backport-15802-to-release-4.6
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 933c55842eb6a2abebe018120d3a3ca7e88c0c11
# Push it to GitHub
git push --set-upstream origin backport-15802-to-release-4.6
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-release-4.6Then, create a pull request where the |
|
The backport to To backport manually, run these commands in your terminal: # Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-release-4.8 release-4.8
# Navigate to the new working tree
cd .worktrees/backport-release-4.8
# Create a new branch
git switch --create backport-15802-to-release-4.8
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 933c55842eb6a2abebe018120d3a3ca7e88c0c11
# Push it to GitHub
git push --set-upstream origin backport-15802-to-release-4.8
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-release-4.8Then, create a pull request where the |
|
The backport to To backport manually, run these commands in your terminal: # Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-release-4.7 release-4.7
# Navigate to the new working tree
cd .worktrees/backport-release-4.7
# Create a new branch
git switch --create backport-15802-to-release-4.7
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 933c55842eb6a2abebe018120d3a3ca7e88c0c11
# Push it to GitHub
git push --set-upstream origin backport-15802-to-release-4.7
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-release-4.7Then, create a pull request where the |
Description
Adds a timeout to the
UpdatedImageandReprocessDeploymentsmessages sent from Central to Sensor during reprocessing.When the timeout triggers, no more messages will be sent by the reprocessor to that failing Sensor until the next reprocessing cycle. Messages will resume on the next reprocessing cycle. A metric is added to count the number of skipped messages when in this state.
The timeout chosen (
1m) was an arbitrary guess - please share if you have data / opinions on what this value should be.(also removed some dead code)
The motivation for this change was due to observed reprocessing not happening in user environments. In one case the cause was a single sensor (out of 100's) not reading from the gRPC bi-directional stream causing grpc's flow control mechanism to block forever, this in turn blocked Central reprocessing for all.
This change will only 'fix' reprocessing, other ACS functions will likely NOT work while the Sensor is in this stuck state (such as generating diagnostic bundles with sensor data)
User-facing documentation
Testing and quality
Automated testing
How I validated my change
Set a low reprocessing interval (30s) and using a custom sensor image that will stop reading from the stream after triggered, observed the Central reprocessor stalling then recovering.
There will be at most 5 error logs/events per cluster (one for each goroutine that times out), the rest of the logs are debug:
Metrics: