Skip to content

ROX-9022: add type to make requests to a retryable source with asynchronous interface#383

Closed
juanrh wants to merge 4 commits intomasterfrom
juanrh/ROX-9022
Closed

ROX-9022: add type to make requests to a retryable source with asynchronous interface#383
juanrh wants to merge 4 commits intomasterfrom
juanrh/ROX-9022

Conversation

@juanrh
Copy link
Contributor

@juanrh juanrh commented Jan 20, 2022

Description

Create type to make requests to a retryable source with asynchronous interface with timeout per request.
This would be used by the CertManager (final name TBD) to reliably request the TLS certificates for a set of components

Previous attempts

I started with this, but the problem is that this doesn't retries the timeouts, which is essentials for using this for fetching values through the Communicate RPC in central, that is bidirectional and has no timeouts or requests failures implemented:

type result struct {
	v interface {}
	err error
}
type RetryableSource interface {
    // ask the source for a result, and assumes the source will perform the necessary 
    // cancellation procedure
	Request()
    // where the source puts the result: we only care about the first one
	ResponseC() chan result
}
type backoffRequesterImpl struct {
	source RetryableSource
	stopC concurrency.ErrorSignal
	backoff wait.Backoff
}

func (r *backoffRequesterImpl) Request(parentCtx context.Context, timeout time.Duration, errReporter errorReporter) (interface{}, error) {
	r.source.Request()
	ctx, cancel := context.WithTimeout(parentCtx, timeout)
	defer cancel()
	for {
		select {
		case <-r.stopC.Done():
			return nil, errors.New("request cancelled")
		case <-ctx.Done():
			// FIXME retry
			return nil, errors.New("timeout")
		case response := <-r.source.ResponseC():
			if response.err != nil {
				if errReporter != nil {
					errReporter.Report(response.err)
				}
				time.AfterFunc(r.backoff.Step(), func() {
					r.source.Request()
				})
			} else {
				return response.v, nil
			}
		}
	}
}

I tried retrying timeouts, but this doesn't reset the timeout timer when a new request is sent.

// NOTE BLOCKING: loops forever or until you cancel
func (r *backoffRequesterImpl) Request(parentCtx context.Context, timeout time.Duration) (interface{}, error) {
	r.source.Request()
	cancelCtx, cancel := context.WithCancel(parentCtx)
	defer cancel()
	timeoutCtx, _ := context.WithTimeout(cancelCtx, timeout)
	for {
		select {
		case <-r.cancelC.Done():
			return nil, errors.New("request cancelled")
		case <-timeoutCtx.Done():
			r.handleError(errors.New("timeout"))
			timeoutCtx, _ = context.WithTimeout(cancelCtx, timeout)
		case response := <-r.source.ResponseC():
			err := response.err
			if err != nil {
				r.handleError(err)
			} else {
				return response.v, nil
			}
		}
	}
}

func (r *backoffRequesterImpl) handleError(err error) {
	time.AfterFunc(r.backoff.Step(), func() {
		r.source.Request()
	})
}

func (r *backoffRequesterImpl) Cancel() {
	r.cancelC.SignalWithError(errors.New("stop"))
}

This does reset the timeout timer on a new request, but I'm not sure it is safe to modify r.timeoutCtx, that is used in a case in the select, while the select is running. It doesn't look concurrently safe but I'm not sure, any insight is appreciated.

type backoffRequesterImpl struct {
	source RetryableSource
	cancelC concurrency.ErrorSignal
	backoff wait.Backoff
	timeoutCtx context.Context
}

// NOTE BLOCKING: loops forever or until you cancel
func (r *backoffRequesterImpl) Request(parentCtx context.Context, timeout time.Duration) (interface{}, error) {
	r.source.Request()
	cancelCtx, cancel := context.WithCancel(parentCtx)
	defer cancel()
	r.timeoutCtx, _ = context.WithTimeout(cancelCtx, timeout)
	for {
		select {
		case <-r.cancelC.Done():
      // assume request will never come. We take first value in `r.source.ResponseC()` as
			// the response, assume `r.source.Request()` does any cancellation required.
			return nil, errors.New("request cancelled")
		case <-r.timeoutCtx.Done():
			r.handleError(cancelCtx, timeout, errors.New("timeout"))
			// r.timeoutCtx, _ = context.WithTimeout(cancelCtx, timeout)
		case response := <-r.source.ResponseC():
			err := response.err
			if err != nil {
				r.handleError(cancelCtx, timeout, err)
			} else {
				return response.v, nil
			}
		}
	}
    // TODO wait for parentCtx and Cancel if done: consider that the way to cancel instead,so no `cancelC concurrency.ErrorSignal`
}

func (r *backoffRequesterImpl) handleError(cancelCtx context.Context, timeout time.Duration, err error) {
	// BAD: starts timeout before request: `r.timeoutCtx, _ = context.WithTimeout(cancelCtx, timeout)`
    r.timeoutCtx = cancelCtx
	time.AfterFunc(r.backoff.Step(), func() {
		r.source.Request()
		// BAD: concurrently modifies timeout, Request loop might not catch it
		r.timeoutCtx, _ = context.WithTimeout(cancelCtx, timeout)
	})
}

func (r *backoffRequesterImpl) Cancel() {
	r.cancelC.SignalWithError(errors.New("stop"))
}

The current code uses a channel and a timer for timeouts instead of contexts. I also replaced the signal with a context for cancellation and a global timeout for the whole process.

Checklist

  • Investigated and inspected CI test results
  • Unit test and regression tests added
  • Evaluated and added CHANGELOG entry if required
  • Determined and documented upgrade steps

If any of these don't apply, please comment below.

Testing Performed

TODO(replace-me)

Juan Rodriguez Hortala added 2 commits January 20, 2022 18:43
Type to make requests to a retryable source that has an
asynchronous interface, with timeout per request, and
configurable backoff
@ghost
Copy link

ghost commented Jan 20, 2022

Tag for build #128932 is 3.68.x-6-g29f5cf6a2e.

💻 For deploying this image using the dev scripts, run the following first:

export MAIN_IMAGE_TAG='3.68.x-6-g29f5cf6a2e'

📦 You can also generate an installation bundle with:

docker run -i --rm stackrox/main:3.68.x-6-g29f5cf6a2e central generate interactive > bundle.zip

🕹️ A roxctl binary artifact can be downloaded from CircleCI.

"k8s.io/apimachinery/pkg/util/wait"
)

// RetryableSource is a value that allows asking for a result, and returns the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide an existing example of something that will provide an interface similar to this?

It seems strange that we are:

  1. not propagating the context, and
  2. the usage model is reading from the channel and cracking the AskForResult whip from time to time?

If our notion of timeout does not match the source's notion, this could lead to strange situations I think...

It's a little hard to think about how the Retriever would work without having well defined source semantics 🤔

Copy link
Contributor Author

@juanrh juanrh Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a couple of commits, that change the interface a bit.

The idea now is to call AskForResult to ask for a result and a new channel just for that result, and call Retry if the result is not obtained on time. I also added passing the context on AskForResult.

type RetryableSource interface {
	AskForResult(ctx context.Context) chan *Result
	Retry()
}

The idea is that the source can forget that you made a request. An example of a RetryableSource would be a SensorComponent that communicates with Central through the Communicate RPC. Sensor side this is implemented by registering a bunch of SensorComponents that get messages to / from central as follows:

  • Send to central: each SensorComponent has method ResponsesC() <-chan *central.MsgFromSensor that ​centralSenderImpl is listening, and forwarding all messages found to central.
  • Receive from central: each SensorComponent has method ProcessMessage(msg *central.MsgToSensor) error that ​centralReceiverImpl calls each time it gets a message from central. Each message is broadcasted to all SensorComponents

Sending messages through Communicate is fire and forget, so this is what I've come up with to encapsulate timeouts and retries.
For local scanner we'd have a SensorComponent that sends a IssueLocalScannerCertsRequest to ask central for certificates. On AskForResult it would sent that request to central and create a new chan *Result, and on Communicate it would send the message to that channel. On Retry it would send a new request to central, and update an internal request ID that is used in IssueLocalScannerCertsRequest and IssueLocalScannerCertsResponse to correlate requests and responses.

Regarding

If our notion of timeout does not match the source's notion, this could lead to strange situations I think...

I don't get what you mean with that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a somewhat related precedent this is similar to Erlang's gen_server call but with retries, although the language is quite different

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See example of certificate source in draft PR #400

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By saying:

If our notion of timeout does not match the source's notion, this could lead to strange situations I think...

I meant that it is not clear what is the relation between the ctx and the recommended wait period between calls to Retry().

Also, unfortunately I got somewhat lost when reading #400 and the things that were most interesting to me, like how the contexts are propagated, does not seem to be finished yet.
Maybe I should just wait until both PRs are a bit more fleshed out...

Copy link
Contributor Author

@juanrh juanrh Jan 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been able to simplify #400 applying @SimonBaeumer 's suggestions. The code in this PR is not used anymore, please take a look when you have some time.
I'm also putting #350 on hold in favor of #400

@juanrh
Copy link
Contributor Author

juanrh commented Jan 25, 2022

closed in favor of #350

@juanrh juanrh closed this Jan 25, 2022
@msugakov msugakov deleted the juanrh/ROX-9022 branch September 16, 2025 18:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants