diff --git a/pkg/concurrency/retry_ticker.go b/pkg/concurrency/retry_ticker.go new file mode 100644 index 0000000000000..3acf4621d388b --- /dev/null +++ b/pkg/concurrency/retry_ticker.go @@ -0,0 +1,113 @@ +package concurrency + +import ( + "context" + "time" + + "github.com/pkg/errors" + "github.com/stackrox/rox/pkg/sync" + "k8s.io/apimachinery/pkg/util/wait" +) + +var ( + _ RetryTicker = (*retryTickerImpl)(nil) + // ErrStartedTimer is returned when Start is called on a timer that was already started. + ErrStartedTimer = errors.New("started timer") + // ErrStoppedTimer is returned when Start is called on a timer that was stopped. + ErrStoppedTimer = errors.New("stopped timer") +) + +// RetryTicker repeatedly calls a function with a timeout and a retry backoff strategy. +// RetryTickers can only be started once. +// RetryTickers are not safe for simultaneous use by multiple goroutines. +type RetryTicker interface { + Start() error + Stop() +} + +type tickFunc func(ctx context.Context) (timeToNextTick time.Duration, err error) + +// NewRetryTicker returns a new RetryTicker that calls the "tick function" `doFunc` repeatedly: +// - When started, the RetryTicker calls `doFunc` immediately, and if that returns an error +// then the RetryTicker will wait the time returned by `backoff.Step` before calling `doFunc` again. +// - `doFunc` should return an error if ctx is cancelled. RetryTicker always calls `doFunc` with a context +// with a timeout of `timeout`. +// - On success `RetryTicker` will reset `backoff`, and wait the amount of time returned by `doFunc` before +// running it again. +func NewRetryTicker(doFunc tickFunc, timeout time.Duration, backoff wait.Backoff) RetryTicker { + return &retryTickerImpl{ + scheduler: time.AfterFunc, + doFunc: doFunc, + timeout: timeout, + initialBackoff: backoff, + backoff: backoff, + } +} + +type retryTickerImpl struct { + scheduler func(d time.Duration, f func()) *time.Timer + doFunc tickFunc + timeout time.Duration + initialBackoff wait.Backoff + backoff wait.Backoff + timer *time.Timer + mutex sync.RWMutex + stopFlag Flag +} + +// Start calls the tick function and schedules the next tick immediately. +// Start returns and error if the RetryTicker is started more than once: +// - ErrStartedTimer is returned if the timer was already started. +// - ErrStoppedTimer is returned if the timer was stopped. +func (t *retryTickerImpl) Start() error { + if t.stopFlag.Get() { + return ErrStoppedTimer + } + if t.getTickTimer() != nil { + return ErrStartedTimer + } + t.backoff = t.initialBackoff // initialize backoff strategy + t.scheduleTick(0) + return nil +} + +// Stop cancels this RetryTicker. If Stop is called while the tick function is running then Stop does not +// wait for the tick function to complete before returning. +func (t *retryTickerImpl) Stop() { + t.stopFlag.Set(true) + t.setTickTimer(nil) +} + +func (t *retryTickerImpl) scheduleTick(timeToTick time.Duration) { + t.setTickTimer(t.scheduler(timeToTick, func() { + ctx, cancel := context.WithTimeout(context.Background(), t.timeout) + defer cancel() + + nextTimeToTick, tickErr := t.doFunc(ctx) + if t.stopFlag.Get() { + // ticker was stopped while tick function was running. + return + } + if tickErr != nil { + t.scheduleTick(t.backoff.Step()) + return + } + t.backoff = t.initialBackoff // reset backoff strategy + t.scheduleTick(nextTimeToTick) + })) +} + +func (t *retryTickerImpl) setTickTimer(timer *time.Timer) { + t.mutex.Lock() + defer t.mutex.Unlock() + if t.timer != nil { + t.timer.Stop() + } + t.timer = timer +} + +func (t *retryTickerImpl) getTickTimer() *time.Timer { + t.mutex.RLock() + defer t.mutex.RUnlock() + return t.timer +} diff --git a/pkg/concurrency/retry_ticker_test.go b/pkg/concurrency/retry_ticker_test.go new file mode 100644 index 0000000000000..a178b1e4c8856 --- /dev/null +++ b/pkg/concurrency/retry_ticker_test.go @@ -0,0 +1,137 @@ +package concurrency + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/util/wait" +) + +var ( + testTimeout = 1 * time.Second + longTime = 5 * time.Second + capTime = 100 * time.Millisecond + backoff = wait.Backoff{ + Duration: capTime, + Factor: 1, + Jitter: 0, + Steps: 2, + Cap: capTime, + } +) + +type testTickFunc struct { + mock.Mock +} + +func (f *testTickFunc) doTick(ctx context.Context) (timeToNextTick time.Duration, err error) { + args := f.Called(ctx) + return args.Get(0).(time.Duration), args.Error(1) +} + +func (f *testTickFunc) Step() time.Duration { + f.Called() + return 0 +} + +type afterFuncSpy struct { + mock.Mock +} + +func (f *afterFuncSpy) afterFunc(d time.Duration, fn func()) *time.Timer { + f.Called(d) + return time.AfterFunc(d, fn) +} + +func TestRetryTickerCallsTickFunction(t *testing.T) { + testCases := map[string]struct { + timeToSecondTick time.Duration + firstErr error + }{ + "success": {timeToSecondTick: capTime, firstErr: nil}, + "with error should retry": {timeToSecondTick: 0, firstErr: errors.New("forced")}, + } + for tcName, tc := range testCases { + t.Run(tcName, func(t *testing.T) { + doneErrSig := NewErrorSignal() + mockFunc := &testTickFunc{} + schedulerSpy := &afterFuncSpy{} + ticker := newRetryTicker(t, mockFunc.doTick) + defer ticker.Stop() + ticker.scheduler = schedulerSpy.afterFunc + + mockFunc.On("doTick", mock.Anything).Return(tc.timeToSecondTick, tc.firstErr).Once() + mockFunc.On("doTick", mock.Anything).Return(longTime, nil).Run(func(args mock.Arguments) { + ticker.Stop() + doneErrSig.Signal() + }).Once() + schedulerSpy.On("afterFunc", time.Duration(0), mock.Anything).Return(nil).Once() + if tc.firstErr == nil { + schedulerSpy.On("afterFunc", tc.timeToSecondTick, mock.Anything).Return(nil).Once() + } else { + schedulerSpy.On("afterFunc", backoff.Duration, mock.Anything).Return(nil).Once() + } + + require.NoError(t, ticker.Start()) + + _, ok := doneErrSig.WaitWithTimeout(testTimeout) + require.True(t, ok, "timeout exceeded") + mockFunc.AssertExpectations(t) + schedulerSpy.AssertExpectations(t) + }) + } +} + +func TestRetryTickerStop(t *testing.T) { + firsTickErrSig := NewErrorSignal() + stopErrSig := NewErrorSignal() + ticker := newRetryTicker(t, func(ctx context.Context) (timeToNextTick time.Duration, err error) { + firsTickErrSig.Signal() + _, ok := stopErrSig.WaitWithTimeout(testTimeout) + require.True(t, ok) + return capTime, nil + }) + defer ticker.Stop() + + require.NoError(t, ticker.Start()) + _, ok := firsTickErrSig.WaitWithTimeout(testTimeout) + require.True(t, ok, "timeout exceeded") + ticker.Stop() + stopErrSig.Signal() + + // ensure `ticker.scheduleTick` does not schedule a new timer after stopping the ticker + time.Sleep(capTime) + assert.Nil(t, ticker.getTickTimer()) +} + +func TestRetryTickerStartWhileStarterFailure(t *testing.T) { + ticker := newRetryTicker(t, func(ctx context.Context) (timeToNextTick time.Duration, err error) { + return 0, nil + }) + defer ticker.Stop() + + require.NoError(t, ticker.Start()) + assert.ErrorIs(t, ErrStartedTimer, ticker.Start()) +} + +func TestRetryTickerStartTwiceFailure(t *testing.T) { + ticker := newRetryTicker(t, func(ctx context.Context) (timeToNextTick time.Duration, err error) { + return 0, nil + }) + defer ticker.Stop() + + require.NoError(t, ticker.Start()) + ticker.Stop() + require.ErrorIs(t, ErrStoppedTimer, ticker.Start()) +} + +func newRetryTicker(t *testing.T, doFunc tickFunc) *retryTickerImpl { + ticker := NewRetryTicker(doFunc, longTime, backoff) + require.IsType(t, &retryTickerImpl{}, ticker) + return ticker.(*retryTickerImpl) +}