Skip to content

ROX-29755: Recover reprocessing when Sensor's are 'stuck'#15802

Merged
dcaravel merged 16 commits intomasterfrom
dc/no-stuck-img-reproc
Jul 3, 2025
Merged

ROX-29755: Recover reprocessing when Sensor's are 'stuck'#15802
dcaravel merged 16 commits intomasterfrom
dc/no-stuck-img-reproc

Conversation

@dcaravel
Copy link
Copy Markdown
Contributor

@dcaravel dcaravel commented Jun 20, 2025

Description

Adds a timeout to the UpdatedImage and ReprocessDeployments messages sent from Central to Sensor during reprocessing.

When the timeout triggers, no more messages will be sent by the reprocessor to that failing Sensor until the next reprocessing cycle. Messages will resume on the next reprocessing cycle. A metric is added to count the number of skipped messages when in this state.

The timeout chosen (1m) was an arbitrary guess - please share if you have data / opinions on what this value should be.

(also removed some dead code)

The motivation for this change was due to observed reprocessing not happening in user environments. In one case the cause was a single sensor (out of 100's) not reading from the gRPC bi-directional stream causing grpc's flow control mechanism to block forever, this in turn blocked Central reprocessing for all.

This change will only 'fix' reprocessing, other ACS functions will likely NOT work while the Sensor is in this stuck state (such as generating diagnostic bundles with sensor data)

User-facing documentation

Testing and quality

  • the change is production ready: the change is GA, or otherwise the functionality is gated by a feature flag
  • CI results are inspected

Automated testing

  • added unit tests
  • modified existing tests

How I validated my change

Set a low reprocessing interval (30s) and using a custom sensor image that will stop reading from the stream after triggered, observed the Central reprocessor stalling then recovering.

There will be at most 5 error logs/events per cluster (one for each goroutine that times out), the rest of the logs are debug:

reprocessor: 2025/06/24 21:39:35.132427 reprocessor.go:424: Error: Error sending updated image to cluster, skipping cluster until next reprocessing cycle {"image": "quay.io/rhacs-eng/collector:4.9.x-115-gcc527084cc", "image_id": "sha256:14814d797f1340eae34eb0aeb573abca0b40d6eda5eac8fb03829af5d584d832", "error": "error injecting message to sensor: context aborted: context deadline exceeded", "dst_cluster": "788b76eb-34ab-44be-9200-fdbd42431172"}
reprocessor: 2025/06/24 21:39:35.430750 reprocessor.go:424: Error: Error sending updated image to cluster, skipping cluster until next reprocessing cycle {"image": "quay.io/openshift-release-dev/ocp-v4.0-art-dev@sha256:4d5b14fb960b805b4dd220e9869fb95446166f8be2ba3a7db2f66dda4a01497f", "image_id": "sha256:4d5b14fb960b805b4dd220e9869fb95446166f8be2ba3a7db2f66dda4a01497f", "error": "error injecting message to sensor: context aborted: context deadline exceeded", "dst_cluster": "788b76eb-34ab-44be-9200-fdbd42431172"}
reprocessor: 2025/06/24 21:39:36.015179 reprocessor.go:404: Debug: Not sending updated image to cluster due to prior errors {"image_id": "sha256:d40192d6ba8e601f75891fd58d5cf95b448c64d42c58151c57fb54ecc5bbaeed", "image": "quay.io/openshift-release-dev/ocp-v4.0-art-dev@sha256:d40192d6ba8e601f75891fd58d5cf95b448c64d42c58151c57fb54ecc5bbaeed", "dst_cluster": "788b76eb-34ab-44be-9200-fdbd42431172"}
reprocessor: 2025/06/24 21:39:36.036657 reprocessor.go:424: Error: Error sending updated image to cluster, skipping cluster until next reprocessing cycle {"image": "quay.io/openshift-release-dev/ocp-v4.0-art-dev@sha256:4e35d069edbf728370f143c91f521e44fd0b1b4726b9ac2528ffd90cf45951c1", "image_id": "sha256:4e35d069edbf728370f143c91f521e44fd0b1b4726b9ac2528ffd90cf45951c1", "error": "error injecting message to sensor: context aborted: context deadline exceeded", "dst_cluster": "788b76eb-34ab-44be-9200-fdbd42431172"}
reprocessor: 2025/06/24 21:39:36.103750 reprocessor.go:424: Error: Error sending updated image to cluster, skipping cluster until next reprocessing cycle {"image": "quay.io/openshift-release-dev/ocp-v4.0-art-dev@sha256:98b9b1d008214073849d95ec6aa35a47d3108e3f7436f0d823243e399f8e6f4a", "image_id": "sha256:98b9b1d008214073849d95ec6aa35a47d3108e3f7436f0d823243e399f8e6f4a", "error": "error injecting message to sensor: context aborted: context deadline exceeded", "dst_cluster": "788b76eb-34ab-44be-9200-fdbd42431172"}
reprocessor: 2025/06/24 21:39:36.170544 reprocessor.go:424: Error: Error sending updated image to cluster, skipping cluster until next reprocessing cycle {"image": "quay.io/openshift-release-dev/ocp-v4.0-art-dev@sha256:bff86abe981ae01a5dc5c9d78aa281856c1709cde7efc2694e8fedd960acf9db", "image_id": "sha256:bff86abe981ae01a5dc5c9d78aa281856c1709cde7efc2694e8fedd960acf9db", "error": "error injecting message to sensor: context aborted: context deadline exceeded", "dst_cluster": "788b76eb-34ab-44be-9200-fdbd42431172"}
reprocessor: 2025/06/24 21:39:36.953084 reprocessor.go:404: Debug: Not sending updated image to cluster due to prior errors {"image_id": "sha256:183be2dd687286d6b6dd93484d571dd77fdc1a5ab314702d0da9c51ba1324445", "image": "quay.io/openshift-release-dev/ocp-v4.0-art-dev@sha256:183be2dd687286d6b6dd93484d571dd77fdc1a5ab314702d0da9c51ba1324445", "dst_cluster": "788b76eb-34ab-44be-9200-fdbd42431172"}
reprocessor: 2025/06/24 21:39:37.168298 reprocessor.go:404: Debug: Not sending updated image to cluster due to prior errors {"image_id": "sha256:4f8ab6ed21a306fb05d2b2f6b340d6e36c1699954fd3f2fadc30d6154b267dda", "image": "quay.io/openshift-release-dev/ocp-v4.0-art-dev@sha256:4f8ab6ed21a306fb05d2b2f6b340d6e36c1699954fd3f2fadc30d6154b267dda", "dst_cluster": "788b76eb-34ab-44be-9200-fdbd42431172"}
reprocessor: 2025/06/24 21:39:37.264409 reprocessor.go:404: Debug: Not sending updated image to cluster due to prior errors {"image_id": "sha256:5205ad4cb639bc5449eb5408ae42c16e2d95b54bf2298ec9db95e84952c3c37d", "image": "quay.io/openshift-release-dev/ocp-v4.0-art-dev@sha256:5205ad4cb639bc5449eb5408ae42c16e2d95b54bf2298ec9db95e84952c3c37d", "dst_cluster": "788b76eb-34ab-44be-9200-fdbd42431172"}
reprocessor: 2025/06/24 21:39:37.289206 reprocessor.go:404: Debug: Not sending updated image to cluster due to prior errors {"image_id": "sha256:6b5eb63bfc10f1510a4b160034545c1e8439d17e305ee1dca6055bb27c9065da", "image": "quay.io/openshift-release-dev/ocp-v4.0-art-dev@sha256:6b5eb63bfc10f1510a4b160034545c1e8439d17e305ee1dca6055bb27c9065da", "dst_cluster": "788b76eb-34ab-44be-9200-fdbd42431172"}
reprocessor: 2025/06/24 21:39:37.306696 reprocessor.go:404: Debug: Not sending updated image to cluster due to prior errors {"image_id": "sha256:eec1e20212eb1eba41af4972b335e0b3706ef6dfd3ab6d76f701431900b101e9", "image": "quay.io/openshift-release-dev/ocp-v4.0-art-dev@sha256:eec1e20212eb1eba41af4972b335e0b3706ef6dfd3ab6d76f701431900b101e9", "dst_cluster": "788b76eb-34ab-44be-9200-fdbd42431172"}

Metrics:

# HELP rox_central_msg_to_sensor_not_sent_count Total messages not sent to Sensor due to errors or other reasons
# TYPE rox_central_msg_to_sensor_not_sent_count counter
rox_central_msg_to_sensor_not_sent_count{ClusterID="4005c535-3da8-45c1-a551-41b2883b40f8",reason="error",type="UpdatedImage"} 1
rox_central_msg_to_sensor_not_sent_count{ClusterID="4005c535-3da8-45c1-a551-41b2883b40f8",reason="signal",type="ClusterHealthResponse"} 1
rox_central_msg_to_sensor_not_sent_count{ClusterID="4005c535-3da8-45c1-a551-41b2883b40f8",reason="signal",type="UpdatedImage"} 59
rox_central_msg_to_sensor_not_sent_count{ClusterID="4005c535-3da8-45c1-a551-41b2883b40f8",reason="skip",type="ReprocessDeployments"} 11
rox_central_msg_to_sensor_not_sent_count{ClusterID="4005c535-3da8-45c1-a551-41b2883b40f8",reason="skip",type="UpdatedImage"} 975

@openshift-ci
Copy link
Copy Markdown

openshift-ci bot commented Jun 20, 2025

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@rhacs-bot
Copy link
Copy Markdown
Contributor

rhacs-bot commented Jun 20, 2025

Images are ready for the commit at 7372f15.

To use with deploy scripts, first export MAIN_IMAGE_TAG=4.9.x-109-g7372f156f5.

@codecov
Copy link
Copy Markdown

codecov bot commented Jun 20, 2025

Codecov Report

Attention: Patch coverage is 80.55556% with 14 lines in your changes missing coverage. Please review.

Project coverage is 48.79%. Comparing base (f1946cc) to head (7372f15).
Report is 100 commits behind head on master.

Files with missing lines Patch % Lines
central/reprocessor/reprocessor.go 85.18% 7 Missing and 1 partial ⚠️
...ntral/sensor/service/connection/connection_impl.go 0.00% 6 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master   #15802      +/-   ##
==========================================
- Coverage   48.80%   48.79%   -0.02%     
==========================================
  Files        2590     2593       +3     
  Lines      190593   190797     +204     
==========================================
+ Hits        93023    93098      +75     
- Misses      90259    90390     +131     
+ Partials     7311     7309       -2     
Flag Coverage Δ
go-unit-tests 48.79% <80.55%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@dcaravel dcaravel force-pushed the dc/no-stuck-img-reproc branch from 49c8073 to 620fde5 Compare June 26, 2025 15:45
@dcaravel
Copy link
Copy Markdown
Contributor Author

/test all

@dcaravel
Copy link
Copy Markdown
Contributor Author

/test all

@dcaravel dcaravel marked this pull request as ready for review June 27, 2025 11:28
@dcaravel dcaravel requested review from a team June 27, 2025 11:29
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @dcaravel - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments

### Comment 1
<location> `central/reprocessor/reprocessor.go:486` </location>
<code_context>

+// injectMessage will inject a message onto connection, an error will be returned if the
+// injection fails for any reason, including timeout.
+func (l *loopImpl) injectMessage(ctx context.Context, conn connection.SensorConnection, msg *central.MsgToSensor) error {
+	if l.injectMessageTimeoutDur > 0 {
+		var cancel context.CancelFunc
</code_context>

<issue_to_address>
Consider using the passed context for timeout only if it is not already deadline-bound.

Wrapping a context with an existing deadline in another timeout may cause issues. Check for an existing deadline before applying injectMessageTimeoutDur.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
	if l.injectMessageTimeoutDur > 0 {
		var cancel context.CancelFunc
		ctx, cancel = context.WithTimeout(ctx, l.injectMessageTimeoutDur)
		defer cancel()
	}
=======
	if l.injectMessageTimeoutDur > 0 {
		if _, hasDeadline := ctx.Deadline(); !hasDeadline {
			var cancel context.CancelFunc
			ctx, cancel = context.WithTimeout(ctx, l.injectMessageTimeoutDur)
			defer cancel()
		}
	}
>>>>>>> REPLACE

</suggested_fix>

### Comment 2
<location> `central/reprocessor/reprocessor_unit_test.go:346` </location>
<code_context>
+		testLoop.reprocessImagesAndResyncDeployments(0, reprocessFuncUpdate, nil)
+	})
+
+	t.Run("skip some messages when are broken clusters", func(t *testing.T) {
+		testLoop, connA, connB := newReprocessorLoop(t)
+
+		// Cluster "a" is healthy, expect all applicable images to be sent.
+		connA.EXPECT().InjectMessage(gomock.Any(), updatedImageTypeCond).Times(len(imgs) / 2).Return(nil)
+		// Cluster "b" is not healthy, expect at MOST imageReprocessorSemaphoreSize attempted messages.
+		connB.EXPECT().InjectMessage(gomock.Any(), updatedImageTypeCond).MaxTimes(int(imageReprocessorSemaphoreSize)).Return(errors.New("broken"))
+
+		connA.EXPECT().InjectMessage(gomock.Any(), reprocessDeploymentsTypeCond).Times(1).Return(nil)
+		// No reprocess deployments message is sent due to previous failures.
+		connB.EXPECT().InjectMessage(gomock.Any(), reprocessDeploymentsTypeCond).Times(0).Return(nil)
+
+		testLoop.reprocessImagesAndResyncDeployments(0, reprocessFuncUpdate, nil)
+	})
+}
</code_context>

<issue_to_address>
Consider asserting the number of skipped messages for broken clusters.

Additionally, please assert that the skip counter metric is incremented correctly to validate the metric logic.

Suggested implementation:

```golang
	t.Run("skip some messages when are broken clusters", func(t *testing.T) {
		testLoop, connA, connB := newReprocessorLoop(t)

		// Cluster "a" is healthy, expect all applicable images to be sent.
		connA.EXPECT().InjectMessage(gomock.Any(), updatedImageTypeCond).Times(len(imgs) / 2).Return(nil)
		// Cluster "b" is not healthy, expect at MOST imageReprocessorSemaphoreSize attempted messages.
		connB.EXPECT().InjectMessage(gomock.Any(), updatedImageTypeCond).MaxTimes(int(imageReprocessorSemaphoreSize)).Return(errors.New("broken"))

		connA.EXPECT().InjectMessage(gomock.Any(), reprocessDeploymentsTypeCond).Times(1).Return(nil)
		// No reprocess deployments message is sent due to previous failures.
		connB.EXPECT().InjectMessage(gomock.Any(), reprocessDeploymentsTypeCond).Times(0).Return(nil)

		// Capture skip metric before
		skippedBefore := imageReprocessorSkippedCounter.Value()

		testLoop.reprocessImagesAndResyncDeployments(0, reprocessFuncUpdate, nil)

		// Calculate expected skipped messages for broken cluster "b"
		expectedSkipped := (len(imgs) / 2) - int(imageReprocessorSemaphoreSize)
		if expectedSkipped < 0 {
			expectedSkipped = 0
		}

		// Assert skip metric incremented correctly
		skippedAfter := imageReprocessorSkippedCounter.Value()
		if int(skippedAfter-skippedBefore) != expectedSkipped {
			t.Errorf("expected %d skipped messages, got %d", expectedSkipped, int(skippedAfter-skippedBefore))
		}
	})

```

- If `imageReprocessorSkippedCounter.Value()` is not the correct way to access the metric, replace it with the appropriate getter for your metrics library.
- If the skip counter is not global, you may need to access it via `testLoop` or another struct.
- If the metric is not implemented, you will need to add it to the production code and increment it when a message is skipped.
</issue_to_address>

### Comment 3
<location> `central/reprocessor/reprocessor_unit_test.go` </location>
<code_context>
+		t.Run("use timeout when duration set", func(t *testing.T) {
</code_context>

<issue_to_address>
Consider adding a test for injectMessage error propagation.

Add a test case where InjectMessage returns an error to verify proper error wrapping and propagation.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Copy link
Copy Markdown
Contributor

@janisz janisz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea but I'm not sure if this is the right place to add a circuit breaker as this will only fix image reporocessing. We need to rethink sensor<->central communication and health checking.

Copy link
Copy Markdown
Contributor

@lvalerom lvalerom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes look good. I also agree we should make the central <-> sensor communication more robust to these kind of things.

[nit] I was a little bit confused at the beginning as to why we were incrementing the skip_count when there was an error. Should we separate the skip_count metric into two to not mix messages that failed to be delivered (due to an error) and messages that were deliberately skipped (central does not attempt to send anything due to previous errors)? Or maybe rename it?

@dcaravel
Copy link
Copy Markdown
Contributor Author

[nit] I was a little bit confused at the beginning as to why we were incrementing the skip_count when there was an error. Should we separate the skip_count metric into two to not mix messages that failed to be delivered (due to an error) and messages that were deliberately skipped (central does not attempt to send anything due to previous errors)? Or maybe rename it?

I'll separate out the metrics (may use an additional label).

@dcaravel
Copy link
Copy Markdown
Contributor Author

@lvalerom what do you think of 7372f15 for separating out the metrics?

@dcaravel dcaravel requested a review from lvalerom June 27, 2025 16:25
@dcaravel
Copy link
Copy Markdown
Contributor Author

/test gke-operator-e2e-tests

Copy link
Copy Markdown
Contributor

@lvalerom lvalerom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lvalerom what do you think of 7372f15 for separating out the metrics?

Nice! Thank you for the changes 🙂

@dcaravel dcaravel merged commit 933c558 into master Jul 3, 2025
107 of 108 checks passed
@dcaravel dcaravel deleted the dc/no-stuck-img-reproc branch July 3, 2025 03:31
@lvalerom lvalerom modified the milestones: 4.8.1-rc.1, 4.7.5-rc.2, 4.6.8-rc.1 Jul 8, 2025
@lvalerom lvalerom added backport release-4.6 Create a PR to backport this PR to release-4.6 backport release-4.7 backport release-4.8 labels Jul 8, 2025
@lvalerom lvalerom removed this from the 4.6.8-rc.1 milestone Jul 8, 2025
@rhacs-bot
Copy link
Copy Markdown
Contributor

The backport to release-4.6 failed:

The process '/usr/bin/git' failed with exit code 1

To backport manually, run these commands in your terminal:

# Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-release-4.6 release-4.6
# Navigate to the new working tree
cd .worktrees/backport-release-4.6
# Create a new branch
git switch --create backport-15802-to-release-4.6
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 933c55842eb6a2abebe018120d3a3ca7e88c0c11
# Push it to GitHub
git push --set-upstream origin backport-15802-to-release-4.6
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-release-4.6

Then, create a pull request where the base branch is release-4.6 and the compare/head branch is backport-15802-to-release-4.6.

@rhacs-bot
Copy link
Copy Markdown
Contributor

The backport to release-4.8 failed:

The process '/usr/bin/git' failed with exit code 1

To backport manually, run these commands in your terminal:

# Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-release-4.8 release-4.8
# Navigate to the new working tree
cd .worktrees/backport-release-4.8
# Create a new branch
git switch --create backport-15802-to-release-4.8
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 933c55842eb6a2abebe018120d3a3ca7e88c0c11
# Push it to GitHub
git push --set-upstream origin backport-15802-to-release-4.8
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-release-4.8

Then, create a pull request where the base branch is release-4.8 and the compare/head branch is backport-15802-to-release-4.8.

@rhacs-bot
Copy link
Copy Markdown
Contributor

The backport to release-4.7 failed:

The process '/usr/bin/git' failed with exit code 1

To backport manually, run these commands in your terminal:

# Fetch latest updates from GitHub
git fetch
# Create a new working tree
git worktree add .worktrees/backport-release-4.7 release-4.7
# Navigate to the new working tree
cd .worktrees/backport-release-4.7
# Create a new branch
git switch --create backport-15802-to-release-4.7
# Cherry-pick the merged commit of this pull request and resolve the conflicts
git cherry-pick -x --mainline 1 933c55842eb6a2abebe018120d3a3ca7e88c0c11
# Push it to GitHub
git push --set-upstream origin backport-15802-to-release-4.7
# Go back to the original working tree
cd ../..
# Delete the working tree
git worktree remove .worktrees/backport-release-4.7

Then, create a pull request where the base branch is release-4.7 and the compare/head branch is backport-15802-to-release-4.7.

lvalerom pushed a commit that referenced this pull request Jul 8, 2025
lvalerom pushed a commit that referenced this pull request Jul 8, 2025
lvalerom pushed a commit that referenced this pull request Jul 8, 2025
lvalerom pushed a commit that referenced this pull request Jul 8, 2025
lvalerom pushed a commit that referenced this pull request Jul 8, 2025
lvalerom added a commit that referenced this pull request Jul 9, 2025
…15932)

Co-authored-by: David Caravello <119438707+dcaravel@users.noreply.github.com>
lvalerom added a commit that referenced this pull request Jul 9, 2025
…15928)

Co-authored-by: David Caravello <119438707+dcaravel@users.noreply.github.com>
lvalerom added a commit that referenced this pull request Jul 9, 2025
…15930)

Co-authored-by: David Caravello <119438707+dcaravel@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants