From f1dac853d8fc3324dbebd95d2f866de399776d31 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Thu, 20 Jan 2022 18:43:11 +0100 Subject: [PATCH 1/4] First version of RetryableSourceRetriever Type to make requests to a retryable source that has an asynchronous interface, with timeout per request, and configurable backoff --- pkg/retry/retry_source.go | 95 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 pkg/retry/retry_source.go diff --git a/pkg/retry/retry_source.go b/pkg/retry/retry_source.go new file mode 100644 index 0000000000000..cb7324bac788a --- /dev/null +++ b/pkg/retry/retry_source.go @@ -0,0 +1,95 @@ +package retry + +import ( + "context" + "time" + + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/util/wait" +) + +// RetryableSource is a value that allows asking for a result, and returns the +// corresponding result asynchronously. +// Clients only care about the first value returned in ResultC(). +// AskForResult() can be called several times to retry the result computation, the +// RetryableSource is in charge of handling the cancellation of the computation if needed. +type RetryableSource interface { + AskForResult() + ResultC() chan *Result +} + +// Result wraps a pair (result, err) produced by a source. By convention +// either err or v has the zero value of its type. +type Result struct { + v interface {} + err error +} + +// NewRetryableSourceRetriever create a new NewRetryableSourceRetriever +func NewRetryableSourceRetriever(backoff wait.Backoff, requestTimeout time.Duration) *RetryableSourceRetriever { + return &RetryableSourceRetriever{ + RequestTimeout: requestTimeout, + Backoff: backoff, + } +} + +type RetryableSourceRetriever struct { + // time to consider failed a call to AskForResult() that didn't return a result yet. + RequestTimeout time.Duration + ErrReporter func (err error) + // should be reset between calls to Run. + Backoff wait.Backoff + timeoutC chan struct{} + timeoutTimer *time.Timer +} + +// Run gets the result from the specified source. +// Any timeout in ctx is respected. +func (r *RetryableSourceRetriever) Run(ctx context.Context, source RetryableSource) (interface{}, error) { + r.timeoutC = make(chan struct{}) + + source.AskForResult() + r.setTimeoutTimer(r.RequestTimeout) + defer r.setTimeoutTimer(-1) + for { + select { + case <-ctx.Done(): + return nil, errors.New("request cancelled") + case <-r.timeoutC: + // assume result will never come. + r.handleError(errors.New("timeout"), source) + case result := <- source.ResultC(): + err := result.err + if err != nil { + r.handleError(err, source) + } else { + return result.v, nil + } + } + } +} + +func (r *RetryableSourceRetriever) handleError(err error, source RetryableSource) { + if r.ErrReporter != nil { + r.ErrReporter(err) + } + r.setTimeoutTimer(-1) + time.AfterFunc(r.Backoff.Step(), func() { + source.AskForResult() + r.setTimeoutTimer(r.RequestTimeout) + }) +} + +// use negative timeout to just stop the timer. +func (r *RetryableSourceRetriever) setTimeoutTimer(timeout time.Duration) { + if r.timeoutTimer != nil { + r.timeoutTimer.Stop() + } + if timeout >= 0 { + r.timeoutTimer = time.AfterFunc(timeout, func() { + r.timeoutC <- struct{}{} + }) + } else { + r.timeoutTimer = nil + } +} From 211c07d0d16129419bbc3a90f6b7d713d8a5760d Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Thu, 20 Jan 2022 18:47:18 +0100 Subject: [PATCH 2/4] fix style --- pkg/retry/retry_source.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/pkg/retry/retry_source.go b/pkg/retry/retry_source.go index cb7324bac788a..80aea01d1ea3f 100644 --- a/pkg/retry/retry_source.go +++ b/pkg/retry/retry_source.go @@ -21,28 +21,29 @@ type RetryableSource interface { // Result wraps a pair (result, err) produced by a source. By convention // either err or v has the zero value of its type. type Result struct { - v interface {} + v interface{} err error } -// NewRetryableSourceRetriever create a new NewRetryableSourceRetriever -func NewRetryableSourceRetriever(backoff wait.Backoff, requestTimeout time.Duration) *RetryableSourceRetriever { - return &RetryableSourceRetriever{ - RequestTimeout: requestTimeout, - Backoff: backoff, - } -} - +// RetryableSourceRetriever be used to retrieve the result in a RetryableSource. type RetryableSourceRetriever struct { // time to consider failed a call to AskForResult() that didn't return a result yet. RequestTimeout time.Duration - ErrReporter func (err error) + ErrReporter func(err error) // should be reset between calls to Run. - Backoff wait.Backoff - timeoutC chan struct{} + Backoff wait.Backoff + timeoutC chan struct{} timeoutTimer *time.Timer } +// NewRetryableSourceRetriever create a new NewRetryableSourceRetriever +func NewRetryableSourceRetriever(backoff wait.Backoff, requestTimeout time.Duration) *RetryableSourceRetriever { + return &RetryableSourceRetriever{ + RequestTimeout: requestTimeout, + Backoff: backoff, + } +} + // Run gets the result from the specified source. // Any timeout in ctx is respected. func (r *RetryableSourceRetriever) Run(ctx context.Context, source RetryableSource) (interface{}, error) { @@ -58,7 +59,7 @@ func (r *RetryableSourceRetriever) Run(ctx context.Context, source RetryableSour case <-r.timeoutC: // assume result will never come. r.handleError(errors.New("timeout"), source) - case result := <- source.ResultC(): + case result := <-source.ResultC(): err := result.err if err != nil { r.handleError(err, source) From cf03df579d472e82e3f1edbf378d42188f054020 Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Fri, 21 Jan 2022 12:47:30 +0100 Subject: [PATCH 3/4] add error handler and validator --- pkg/retry/retry_source.go | 40 +++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/pkg/retry/retry_source.go b/pkg/retry/retry_source.go index 80aea01d1ea3f..62ac1ead862d6 100644 --- a/pkg/retry/retry_source.go +++ b/pkg/retry/retry_source.go @@ -8,14 +8,16 @@ import ( "k8s.io/apimachinery/pkg/util/wait" ) -// RetryableSource is a value that allows asking for a result, and returns the -// corresponding result asynchronously. -// Clients only care about the first value returned in ResultC(). -// AskForResult() can be called several times to retry the result computation, the +// RetryableSource is a proxy with an object that is able to compute a result, but +// that might forget our request, or return an error result, and that returns the +// result asynchronously. +// AskForResult() can be called to request a result, that should be make available in the +// returned channel. Each time AskForResult() is called the previously returned channel is abandoned. +// Retry() can be called several times to retry the result computation, the // RetryableSource is in charge of handling the cancellation of the computation if needed. type RetryableSource interface { - AskForResult() - ResultC() chan *Result + AskForResult() chan *Result + Retry() } // Result wraps a pair (result, err) produced by a source. By convention @@ -29,7 +31,11 @@ type Result struct { type RetryableSourceRetriever struct { // time to consider failed a call to AskForResult() that didn't return a result yet. RequestTimeout time.Duration - ErrReporter func(err error) + // optionally specify a function to invoke on each error. waitDuration is the time until + // the next retry. + OnError func(err error, timeToNextRetry time.Duration) + // optionally specify a validation function for each result. + ValidateResult func(interface{}) bool // should be reset between calls to Run. Backoff wait.Backoff timeoutC chan struct{} @@ -49,7 +55,7 @@ func NewRetryableSourceRetriever(backoff wait.Backoff, requestTimeout time.Durat func (r *RetryableSourceRetriever) Run(ctx context.Context, source RetryableSource) (interface{}, error) { r.timeoutC = make(chan struct{}) - source.AskForResult() + resultC := source.AskForResult() r.setTimeoutTimer(r.RequestTimeout) defer r.setTimeoutTimer(-1) for { @@ -59,24 +65,30 @@ func (r *RetryableSourceRetriever) Run(ctx context.Context, source RetryableSour case <-r.timeoutC: // assume result will never come. r.handleError(errors.New("timeout"), source) - case result := <-source.ResultC(): + case result := <-resultC: err := result.err if err != nil { r.handleError(err, source) } else { - return result.v, nil + if r.ValidateResult != nil && !r.ValidateResult(result.v) { + err := errors.Errorf("validation failed for value %v", result.v) + r.handleError(err, source) + } else { + return result.v, nil + } } } } } func (r *RetryableSourceRetriever) handleError(err error, source RetryableSource) { - if r.ErrReporter != nil { - r.ErrReporter(err) + waitDuration := r.Backoff.Step() + if r.OnError != nil { + r.OnError(err, waitDuration) } r.setTimeoutTimer(-1) - time.AfterFunc(r.Backoff.Step(), func() { - source.AskForResult() + time.AfterFunc(waitDuration, func() { + source.Retry() r.setTimeoutTimer(r.RequestTimeout) }) } From 29f5cf6a2e4747870b1819ffce127a0c3341a67b Mon Sep 17 00:00:00 2001 From: Juan Rodriguez Hortala Date: Fri, 21 Jan 2022 12:50:54 +0100 Subject: [PATCH 4/4] Pass context on AskForResult --- pkg/retry/retry_source.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/retry/retry_source.go b/pkg/retry/retry_source.go index 62ac1ead862d6..f85440eeaa158 100644 --- a/pkg/retry/retry_source.go +++ b/pkg/retry/retry_source.go @@ -16,7 +16,7 @@ import ( // Retry() can be called several times to retry the result computation, the // RetryableSource is in charge of handling the cancellation of the computation if needed. type RetryableSource interface { - AskForResult() chan *Result + AskForResult(ctx context.Context) chan *Result Retry() } @@ -55,7 +55,7 @@ func NewRetryableSourceRetriever(backoff wait.Backoff, requestTimeout time.Durat func (r *RetryableSourceRetriever) Run(ctx context.Context, source RetryableSource) (interface{}, error) { r.timeoutC = make(chan struct{}) - resultC := source.AskForResult() + resultC := source.AskForResult(ctx) r.setTimeoutTimer(r.RequestTimeout) defer r.setTimeoutTimer(-1) for {