Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
681334b
First version of RetryableSourceRetriever
Jan 20, 2022
b801d57
fix style
Jan 20, 2022
b8def21
add error handler and validator
Jan 21, 2022
b35ff30
Pass context on AskForResult
Jan 21, 2022
de7d6f2
Draft CertManager
Jan 18, 2022
a8019a0
Remove references to central, and checkstyle
Jan 19, 2022
38e1959
Only reset backoff on successful refresh
Jan 19, 2022
d8f4a1f
outline of retries tests
Jan 19, 2022
19c8b3f
Move retries with timeout and secret handling out of cert refresher
Jan 21, 2022
6bad26d
fix style
Jan 21, 2022
8a9e935
retry failed recovery
Jan 21, 2022
ad4239c
generalize CertRefresher into generic RetryTicker
Jan 25, 2022
539fb70
remove RetryableSource
Jan 25, 2022
f0a9c32
fix style
Jan 25, 2022
88f5254
cleanup
Jan 26, 2022
fca9cd2
make setTickTimer concurrently safe
Jan 26, 2022
773bbfb
simplify test dropping suite
Jan 26, 2022
0f85a0b
use builder pattern and make event handlers private
Jan 26, 2022
100099f
use import from pkg/sync
Jan 27, 2022
42af3b5
remove handler functions and builder
Jan 27, 2022
2c0954a
use terser field names
Jan 27, 2022
62da8c7
cleanup code lingering from previous change
Jan 31, 2022
f216984
rename backoffPrototype to initialBackoff
Jan 31, 2022
aab2d51
cleanup test and make it easier to read
Jan 31, 2022
5e80465
try to make test more readable moving config to test case struct
Jan 31, 2022
905ba26
improve comments
Jan 31, 2022
8ce6819
rename mock f function as "doTick"
Feb 1, 2022
0969569
make test runs faster by reducing waits
Feb 1, 2022
b4626cf
fix bug where stopped timer was restarted by scheduleTick
Feb 1, 2022
7f8d96e
reset backoff on start
Feb 1, 2022
262d3c0
improve comments
Feb 3, 2022
1c0c961
only allow starting the ticker once
Feb 3, 2022
b6f69a5
simplify ticker interface by removing Start method
Feb 3, 2022
d9fa438
Revert "simplify ticker interface by removing Start method"
Feb 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions pkg/concurrency/retry_ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package concurrency

import (
"context"
"time"

"github.com/pkg/errors"
"github.com/stackrox/rox/pkg/sync"
"k8s.io/apimachinery/pkg/util/wait"
)

var (
_ RetryTicker = (*retryTickerImpl)(nil)
// ErrStartedTimer is returned when Start is called on a timer that was already started.
ErrStartedTimer = errors.New("started timer")
// ErrStoppedTimer is returned when Start is called on a timer that was stopped.
ErrStoppedTimer = errors.New("stopped timer")
)

// RetryTicker repeatedly calls a function with a timeout and a retry backoff strategy.
// RetryTickers can only be started once.
// RetryTickers are not safe for simultaneous use by multiple goroutines.
type RetryTicker interface {
Start() error
Stop()
}

type tickFunc func(ctx context.Context) (timeToNextTick time.Duration, err error)

// NewRetryTicker returns a new RetryTicker that calls the "tick function" `doFunc` repeatedly:
// - When started, the RetryTicker calls `doFunc` immediately, and if that returns an error
// then the RetryTicker will wait the time returned by `backoff.Step` before calling `doFunc` again.
// - `doFunc` should return an error if ctx is cancelled. RetryTicker always calls `doFunc` with a context
// with a timeout of `timeout`.
// - On success `RetryTicker` will reset `backoff`, and wait the amount of time returned by `doFunc` before
// running it again.
func NewRetryTicker(doFunc tickFunc, timeout time.Duration, backoff wait.Backoff) RetryTicker {
return &retryTickerImpl{
scheduler: time.AfterFunc,
doFunc: doFunc,
timeout: timeout,
initialBackoff: backoff,
backoff: backoff,
}
}

type retryTickerImpl struct {
scheduler func(d time.Duration, f func()) *time.Timer
doFunc tickFunc
timeout time.Duration
initialBackoff wait.Backoff
backoff wait.Backoff
timer *time.Timer
mutex sync.RWMutex
stopFlag Flag
}

// Start calls the tick function and schedules the next tick immediately.
// Start returns and error if the RetryTicker is started more than once:
// - ErrStartedTimer is returned if the timer was already started.
// - ErrStoppedTimer is returned if the timer was stopped.
func (t *retryTickerImpl) Start() error {
if t.stopFlag.Get() {
return ErrStoppedTimer
}
if t.getTickTimer() != nil {
return ErrStartedTimer
}
t.backoff = t.initialBackoff // initialize backoff strategy
t.scheduleTick(0)
return nil
}

// Stop cancels this RetryTicker. If Stop is called while the tick function is running then Stop does not
// wait for the tick function to complete before returning.
func (t *retryTickerImpl) Stop() {
t.stopFlag.Set(true)
t.setTickTimer(nil)
}

func (t *retryTickerImpl) scheduleTick(timeToTick time.Duration) {
t.setTickTimer(t.scheduler(timeToTick, func() {
ctx, cancel := context.WithTimeout(context.Background(), t.timeout)
defer cancel()

nextTimeToTick, tickErr := t.doFunc(ctx)
if t.stopFlag.Get() {
// ticker was stopped while tick function was running.
return
}
if tickErr != nil {
t.scheduleTick(t.backoff.Step())
return
}
t.backoff = t.initialBackoff // reset backoff strategy
t.scheduleTick(nextTimeToTick)
}))
}

func (t *retryTickerImpl) setTickTimer(timer *time.Timer) {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.timer != nil {
t.timer.Stop()
}
t.timer = timer
}

func (t *retryTickerImpl) getTickTimer() *time.Timer {
t.mutex.RLock()
defer t.mutex.RUnlock()
return t.timer
}
137 changes: 137 additions & 0 deletions pkg/concurrency/retry_ticker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package concurrency

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/wait"
)

var (
testTimeout = 1 * time.Second
longTime = 5 * time.Second
capTime = 100 * time.Millisecond
backoff = wait.Backoff{
Duration: capTime,
Factor: 1,
Jitter: 0,
Steps: 2,
Cap: capTime,
}
)

type testTickFunc struct {
mock.Mock
}

func (f *testTickFunc) doTick(ctx context.Context) (timeToNextTick time.Duration, err error) {
args := f.Called(ctx)
return args.Get(0).(time.Duration), args.Error(1)
}

func (f *testTickFunc) Step() time.Duration {
f.Called()
return 0
}

type afterFuncSpy struct {
mock.Mock
}

func (f *afterFuncSpy) afterFunc(d time.Duration, fn func()) *time.Timer {
f.Called(d)
return time.AfterFunc(d, fn)
}

func TestRetryTickerCallsTickFunction(t *testing.T) {
testCases := map[string]struct {
timeToSecondTick time.Duration
firstErr error
}{
"success": {timeToSecondTick: capTime, firstErr: nil},
"with error should retry": {timeToSecondTick: 0, firstErr: errors.New("forced")},
}
for tcName, tc := range testCases {
t.Run(tcName, func(t *testing.T) {
doneErrSig := NewErrorSignal()
mockFunc := &testTickFunc{}
schedulerSpy := &afterFuncSpy{}
ticker := newRetryTicker(t, mockFunc.doTick)
defer ticker.Stop()
ticker.scheduler = schedulerSpy.afterFunc

mockFunc.On("doTick", mock.Anything).Return(tc.timeToSecondTick, tc.firstErr).Once()
mockFunc.On("doTick", mock.Anything).Return(longTime, nil).Run(func(args mock.Arguments) {
ticker.Stop()
doneErrSig.Signal()
}).Once()
schedulerSpy.On("afterFunc", time.Duration(0), mock.Anything).Return(nil).Once()
if tc.firstErr == nil {
schedulerSpy.On("afterFunc", tc.timeToSecondTick, mock.Anything).Return(nil).Once()
} else {
schedulerSpy.On("afterFunc", backoff.Duration, mock.Anything).Return(nil).Once()
}

require.NoError(t, ticker.Start())

_, ok := doneErrSig.WaitWithTimeout(testTimeout)
require.True(t, ok, "timeout exceeded")
mockFunc.AssertExpectations(t)
schedulerSpy.AssertExpectations(t)
})
}
}

func TestRetryTickerStop(t *testing.T) {
firsTickErrSig := NewErrorSignal()
stopErrSig := NewErrorSignal()
ticker := newRetryTicker(t, func(ctx context.Context) (timeToNextTick time.Duration, err error) {
firsTickErrSig.Signal()
_, ok := stopErrSig.WaitWithTimeout(testTimeout)
require.True(t, ok)
return capTime, nil
})
defer ticker.Stop()

require.NoError(t, ticker.Start())
_, ok := firsTickErrSig.WaitWithTimeout(testTimeout)
require.True(t, ok, "timeout exceeded")
ticker.Stop()
stopErrSig.Signal()

// ensure `ticker.scheduleTick` does not schedule a new timer after stopping the ticker
time.Sleep(capTime)
assert.Nil(t, ticker.getTickTimer())
}

func TestRetryTickerStartWhileStarterFailure(t *testing.T) {
ticker := newRetryTicker(t, func(ctx context.Context) (timeToNextTick time.Duration, err error) {
return 0, nil
})
defer ticker.Stop()

require.NoError(t, ticker.Start())
assert.ErrorIs(t, ErrStartedTimer, ticker.Start())
}

func TestRetryTickerStartTwiceFailure(t *testing.T) {
ticker := newRetryTicker(t, func(ctx context.Context) (timeToNextTick time.Duration, err error) {
return 0, nil
})
defer ticker.Stop()

require.NoError(t, ticker.Start())
ticker.Stop()
require.ErrorIs(t, ErrStoppedTimer, ticker.Start())
}

func newRetryTicker(t *testing.T, doFunc tickFunc) *retryTickerImpl {
ticker := NewRetryTicker(doFunc, longTime, backoff)
require.IsType(t, &retryTickerImpl{}, ticker)
return ticker.(*retryTickerImpl)
}