-
Notifications
You must be signed in to change notification settings - Fork 174
ROX-8969: Add RetryTicker type for running a function periodically with retries #350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
34 commits
Select commit
Hold shift + click to select a range
681334b
First version of RetryableSourceRetriever
b801d57
fix style
b8def21
add error handler and validator
b35ff30
Pass context on AskForResult
de7d6f2
Draft CertManager
a8019a0
Remove references to central, and checkstyle
38e1959
Only reset backoff on successful refresh
d8f4a1f
outline of retries tests
19c8b3f
Move retries with timeout and secret handling out of cert refresher
6bad26d
fix style
8a9e935
retry failed recovery
ad4239c
generalize CertRefresher into generic RetryTicker
539fb70
remove RetryableSource
f0a9c32
fix style
88f5254
cleanup
fca9cd2
make setTickTimer concurrently safe
773bbfb
simplify test dropping suite
0f85a0b
use builder pattern and make event handlers private
100099f
use import from pkg/sync
42af3b5
remove handler functions and builder
2c0954a
use terser field names
62da8c7
cleanup code lingering from previous change
f216984
rename backoffPrototype to initialBackoff
aab2d51
cleanup test and make it easier to read
5e80465
try to make test more readable moving config to test case struct
905ba26
improve comments
8ce6819
rename mock f function as "doTick"
0969569
make test runs faster by reducing waits
b4626cf
fix bug where stopped timer was restarted by scheduleTick
7f8d96e
reset backoff on start
262d3c0
improve comments
1c0c961
only allow starting the ticker once
b6f69a5
simplify ticker interface by removing Start method
d9fa438
Revert "simplify ticker interface by removing Start method"
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| package concurrency | ||
|
|
||
| import ( | ||
| "context" | ||
| "time" | ||
|
|
||
| "github.com/pkg/errors" | ||
| "github.com/stackrox/rox/pkg/sync" | ||
| "k8s.io/apimachinery/pkg/util/wait" | ||
| ) | ||
|
|
||
| var ( | ||
| _ RetryTicker = (*retryTickerImpl)(nil) | ||
| // ErrStartedTimer is returned when Start is called on a timer that was already started. | ||
| ErrStartedTimer = errors.New("started timer") | ||
| // ErrStoppedTimer is returned when Start is called on a timer that was stopped. | ||
| ErrStoppedTimer = errors.New("stopped timer") | ||
| ) | ||
|
|
||
| // RetryTicker repeatedly calls a function with a timeout and a retry backoff strategy. | ||
| // RetryTickers can only be started once. | ||
| // RetryTickers are not safe for simultaneous use by multiple goroutines. | ||
| type RetryTicker interface { | ||
| Start() error | ||
| Stop() | ||
| } | ||
|
|
||
| type tickFunc func(ctx context.Context) (timeToNextTick time.Duration, err error) | ||
|
|
||
| // NewRetryTicker returns a new RetryTicker that calls the "tick function" `doFunc` repeatedly: | ||
| // - When started, the RetryTicker calls `doFunc` immediately, and if that returns an error | ||
| // then the RetryTicker will wait the time returned by `backoff.Step` before calling `doFunc` again. | ||
| // - `doFunc` should return an error if ctx is cancelled. RetryTicker always calls `doFunc` with a context | ||
| // with a timeout of `timeout`. | ||
| // - On success `RetryTicker` will reset `backoff`, and wait the amount of time returned by `doFunc` before | ||
| // running it again. | ||
| func NewRetryTicker(doFunc tickFunc, timeout time.Duration, backoff wait.Backoff) RetryTicker { | ||
| return &retryTickerImpl{ | ||
| scheduler: time.AfterFunc, | ||
| doFunc: doFunc, | ||
| timeout: timeout, | ||
| initialBackoff: backoff, | ||
| backoff: backoff, | ||
| } | ||
| } | ||
|
|
||
| type retryTickerImpl struct { | ||
| scheduler func(d time.Duration, f func()) *time.Timer | ||
| doFunc tickFunc | ||
| timeout time.Duration | ||
| initialBackoff wait.Backoff | ||
| backoff wait.Backoff | ||
| timer *time.Timer | ||
| mutex sync.RWMutex | ||
| stopFlag Flag | ||
| } | ||
|
|
||
| // Start calls the tick function and schedules the next tick immediately. | ||
| // Start returns and error if the RetryTicker is started more than once: | ||
| // - ErrStartedTimer is returned if the timer was already started. | ||
| // - ErrStoppedTimer is returned if the timer was stopped. | ||
| func (t *retryTickerImpl) Start() error { | ||
| if t.stopFlag.Get() { | ||
| return ErrStoppedTimer | ||
| } | ||
| if t.getTickTimer() != nil { | ||
| return ErrStartedTimer | ||
| } | ||
| t.backoff = t.initialBackoff // initialize backoff strategy | ||
| t.scheduleTick(0) | ||
| return nil | ||
| } | ||
|
|
||
| // Stop cancels this RetryTicker. If Stop is called while the tick function is running then Stop does not | ||
| // wait for the tick function to complete before returning. | ||
| func (t *retryTickerImpl) Stop() { | ||
| t.stopFlag.Set(true) | ||
| t.setTickTimer(nil) | ||
| } | ||
|
|
||
| func (t *retryTickerImpl) scheduleTick(timeToTick time.Duration) { | ||
| t.setTickTimer(t.scheduler(timeToTick, func() { | ||
| ctx, cancel := context.WithTimeout(context.Background(), t.timeout) | ||
| defer cancel() | ||
|
|
||
| nextTimeToTick, tickErr := t.doFunc(ctx) | ||
| if t.stopFlag.Get() { | ||
| // ticker was stopped while tick function was running. | ||
| return | ||
| } | ||
| if tickErr != nil { | ||
| t.scheduleTick(t.backoff.Step()) | ||
| return | ||
| } | ||
| t.backoff = t.initialBackoff // reset backoff strategy | ||
| t.scheduleTick(nextTimeToTick) | ||
| })) | ||
| } | ||
|
|
||
| func (t *retryTickerImpl) setTickTimer(timer *time.Timer) { | ||
| t.mutex.Lock() | ||
| defer t.mutex.Unlock() | ||
| if t.timer != nil { | ||
| t.timer.Stop() | ||
| } | ||
| t.timer = timer | ||
| } | ||
|
|
||
| func (t *retryTickerImpl) getTickTimer() *time.Timer { | ||
| t.mutex.RLock() | ||
| defer t.mutex.RUnlock() | ||
| return t.timer | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,137 @@ | ||
| package concurrency | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/mock" | ||
| "github.com/stretchr/testify/require" | ||
| "k8s.io/apimachinery/pkg/util/wait" | ||
| ) | ||
|
|
||
| var ( | ||
| testTimeout = 1 * time.Second | ||
| longTime = 5 * time.Second | ||
| capTime = 100 * time.Millisecond | ||
| backoff = wait.Backoff{ | ||
| Duration: capTime, | ||
| Factor: 1, | ||
| Jitter: 0, | ||
| Steps: 2, | ||
| Cap: capTime, | ||
| } | ||
| ) | ||
|
|
||
| type testTickFunc struct { | ||
| mock.Mock | ||
| } | ||
|
|
||
| func (f *testTickFunc) doTick(ctx context.Context) (timeToNextTick time.Duration, err error) { | ||
| args := f.Called(ctx) | ||
| return args.Get(0).(time.Duration), args.Error(1) | ||
| } | ||
|
|
||
| func (f *testTickFunc) Step() time.Duration { | ||
| f.Called() | ||
| return 0 | ||
| } | ||
|
|
||
| type afterFuncSpy struct { | ||
| mock.Mock | ||
| } | ||
|
|
||
| func (f *afterFuncSpy) afterFunc(d time.Duration, fn func()) *time.Timer { | ||
| f.Called(d) | ||
| return time.AfterFunc(d, fn) | ||
| } | ||
|
|
||
| func TestRetryTickerCallsTickFunction(t *testing.T) { | ||
| testCases := map[string]struct { | ||
| timeToSecondTick time.Duration | ||
| firstErr error | ||
| }{ | ||
| "success": {timeToSecondTick: capTime, firstErr: nil}, | ||
| "with error should retry": {timeToSecondTick: 0, firstErr: errors.New("forced")}, | ||
| } | ||
| for tcName, tc := range testCases { | ||
| t.Run(tcName, func(t *testing.T) { | ||
| doneErrSig := NewErrorSignal() | ||
| mockFunc := &testTickFunc{} | ||
| schedulerSpy := &afterFuncSpy{} | ||
| ticker := newRetryTicker(t, mockFunc.doTick) | ||
| defer ticker.Stop() | ||
| ticker.scheduler = schedulerSpy.afterFunc | ||
|
|
||
| mockFunc.On("doTick", mock.Anything).Return(tc.timeToSecondTick, tc.firstErr).Once() | ||
| mockFunc.On("doTick", mock.Anything).Return(longTime, nil).Run(func(args mock.Arguments) { | ||
| ticker.Stop() | ||
| doneErrSig.Signal() | ||
| }).Once() | ||
| schedulerSpy.On("afterFunc", time.Duration(0), mock.Anything).Return(nil).Once() | ||
| if tc.firstErr == nil { | ||
| schedulerSpy.On("afterFunc", tc.timeToSecondTick, mock.Anything).Return(nil).Once() | ||
| } else { | ||
| schedulerSpy.On("afterFunc", backoff.Duration, mock.Anything).Return(nil).Once() | ||
| } | ||
|
|
||
| require.NoError(t, ticker.Start()) | ||
|
|
||
| _, ok := doneErrSig.WaitWithTimeout(testTimeout) | ||
| require.True(t, ok, "timeout exceeded") | ||
| mockFunc.AssertExpectations(t) | ||
| schedulerSpy.AssertExpectations(t) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func TestRetryTickerStop(t *testing.T) { | ||
| firsTickErrSig := NewErrorSignal() | ||
| stopErrSig := NewErrorSignal() | ||
| ticker := newRetryTicker(t, func(ctx context.Context) (timeToNextTick time.Duration, err error) { | ||
| firsTickErrSig.Signal() | ||
| _, ok := stopErrSig.WaitWithTimeout(testTimeout) | ||
| require.True(t, ok) | ||
| return capTime, nil | ||
| }) | ||
| defer ticker.Stop() | ||
|
|
||
| require.NoError(t, ticker.Start()) | ||
| _, ok := firsTickErrSig.WaitWithTimeout(testTimeout) | ||
| require.True(t, ok, "timeout exceeded") | ||
| ticker.Stop() | ||
| stopErrSig.Signal() | ||
|
|
||
| // ensure `ticker.scheduleTick` does not schedule a new timer after stopping the ticker | ||
| time.Sleep(capTime) | ||
| assert.Nil(t, ticker.getTickTimer()) | ||
| } | ||
|
|
||
| func TestRetryTickerStartWhileStarterFailure(t *testing.T) { | ||
| ticker := newRetryTicker(t, func(ctx context.Context) (timeToNextTick time.Duration, err error) { | ||
| return 0, nil | ||
| }) | ||
| defer ticker.Stop() | ||
|
|
||
| require.NoError(t, ticker.Start()) | ||
| assert.ErrorIs(t, ErrStartedTimer, ticker.Start()) | ||
| } | ||
|
|
||
| func TestRetryTickerStartTwiceFailure(t *testing.T) { | ||
| ticker := newRetryTicker(t, func(ctx context.Context) (timeToNextTick time.Duration, err error) { | ||
| return 0, nil | ||
| }) | ||
| defer ticker.Stop() | ||
|
|
||
| require.NoError(t, ticker.Start()) | ||
| ticker.Stop() | ||
| require.ErrorIs(t, ErrStoppedTimer, ticker.Start()) | ||
| } | ||
|
|
||
| func newRetryTicker(t *testing.T, doFunc tickFunc) *retryTickerImpl { | ||
| ticker := NewRetryTicker(doFunc, longTime, backoff) | ||
| require.IsType(t, &retryTickerImpl{}, ticker) | ||
| return ticker.(*retryTickerImpl) | ||
| } | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.