ROX-9022: add type to make requests to a retryable source with asynchronous interface#383
ROX-9022: add type to make requests to a retryable source with asynchronous interface#383
Conversation
Type to make requests to a retryable source that has an asynchronous interface, with timeout per request, and configurable backoff
|
Tag for build #128932 is 💻 For deploying this image using the dev scripts, run the following first: export MAIN_IMAGE_TAG='3.68.x-6-g29f5cf6a2e'📦 You can also generate an installation bundle with: docker run -i --rm stackrox/main:3.68.x-6-g29f5cf6a2e central generate interactive > bundle.zip🕹️ A |
pkg/retry/retry_source.go
Outdated
| "k8s.io/apimachinery/pkg/util/wait" | ||
| ) | ||
|
|
||
| // RetryableSource is a value that allows asking for a result, and returns the |
There was a problem hiding this comment.
Can you provide an existing example of something that will provide an interface similar to this?
It seems strange that we are:
- not propagating the context, and
- the usage model is reading from the channel and cracking the
AskForResultwhip from time to time?
If our notion of timeout does not match the source's notion, this could lead to strange situations I think...
It's a little hard to think about how the Retriever would work without having well defined source semantics 🤔
There was a problem hiding this comment.
I've added a couple of commits, that change the interface a bit.
The idea now is to call AskForResult to ask for a result and a new channel just for that result, and call Retry if the result is not obtained on time. I also added passing the context on AskForResult.
type RetryableSource interface {
AskForResult(ctx context.Context) chan *Result
Retry()
}The idea is that the source can forget that you made a request. An example of a RetryableSource would be a SensorComponent that communicates with Central through the Communicate RPC. Sensor side this is implemented by registering a bunch of SensorComponents that get messages to / from central as follows:
- Send to central: each
SensorComponenthas methodResponsesC() <-chan *central.MsgFromSensorthat centralSenderImpl is listening, and forwarding all messages found to central. - Receive from central: each
SensorComponenthas methodProcessMessage(msg *central.MsgToSensor) errorthat centralReceiverImpl calls each time it gets a message from central. Each message is broadcasted to allSensorComponents
Sending messages through Communicate is fire and forget, so this is what I've come up with to encapsulate timeouts and retries.
For local scanner we'd have a SensorComponent that sends a IssueLocalScannerCertsRequest to ask central for certificates. On AskForResult it would sent that request to central and create a new chan *Result, and on Communicate it would send the message to that channel. On Retry it would send a new request to central, and update an internal request ID that is used in IssueLocalScannerCertsRequest and IssueLocalScannerCertsResponse to correlate requests and responses.
Regarding
If our notion of timeout does not match the source's notion, this could lead to strange situations I think...
I don't get what you mean with that
There was a problem hiding this comment.
As a somewhat related precedent this is similar to Erlang's gen_server call but with retries, although the language is quite different
There was a problem hiding this comment.
See example of certificate source in draft PR #400
There was a problem hiding this comment.
By saying:
If our notion of timeout does not match the source's notion, this could lead to strange situations I think...
I meant that it is not clear what is the relation between the ctx and the recommended wait period between calls to Retry().
Also, unfortunately I got somewhat lost when reading #400 and the things that were most interesting to me, like how the contexts are propagated, does not seem to be finished yet.
Maybe I should just wait until both PRs are a bit more fleshed out...
There was a problem hiding this comment.
I've been able to simplify #400 applying @SimonBaeumer 's suggestions. The code in this PR is not used anymore, please take a look when you have some time.
I'm also putting #350 on hold in favor of #400
|
closed in favor of #350 |
Description
Create type to make requests to a retryable source with asynchronous interface with timeout per request.
This would be used by the CertManager (final name TBD) to reliably request the TLS certificates for a set of components
Previous attempts
I started with this, but the problem is that this doesn't retries the timeouts, which is essentials for using this for fetching values through the
CommunicateRPC in central, that is bidirectional and has no timeouts or requests failures implemented:I tried retrying timeouts, but this doesn't reset the timeout timer when a new request is sent.
This does reset the timeout timer on a new request, but I'm not sure it is safe to modify
r.timeoutCtx, that is used in acasein theselect, while the select is running. It doesn't look concurrently safe but I'm not sure, any insight is appreciated.The current code uses a channel and a timer for timeouts instead of contexts. I also replaced the signal with a context for cancellation and a global timeout for the whole process.
Checklist
If any of these don't apply, please comment below.
Testing Performed
TODO(replace-me)