Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: always reset open time on half-open #12

Merged
merged 5 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ jobs:
- uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go }}
- run: go test -race -v ./...
- run: go list -m | xargs go test -race -v
13 changes: 13 additions & 0 deletions breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ const (
stateChangeClose
)

func (s stateChange) String() string {
switch s {
case stateChangeNone:
return "none"
case stateChangeOpen:
return "open"
case stateChangeClose:
return "close"
default:
return "unknown"
}
}

// Observer is used to observe the result of a single wrapped call through the circuit breaker.
// Calls in an open circuit cause no observer to be created.
type Observer interface {
Expand Down
71 changes: 32 additions & 39 deletions breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,23 @@ func TestBreaker_Observe_State(t *testing.T) {
type stages struct {
calls int
failureFunc func(int) bool
waitForHalfOpen bool // whether to put circuit in half-open BEFORE observing the call's result
waitForHalfOpen bool // whether to put circuit in half-open BEFORE observing the call's result
wantStateChange stateChange // expected state change at the END of the stage
}
tests := []struct {
name string
breakers map[string]Breaker
stages []stages
wantState stateChange
name string
breakers map[string]Breaker
stages []stages
}{
{
name: "start closed",
breakers: map[string]Breaker{
"ewma": NewEWMABreaker(10, 0.3),
"slidingwindow": NewSlidingWindowBreaker(10*time.Second, 0.3),
},
wantState: stateChangeNone,
stages: []stages{
{calls: 1, failureFunc: alwaysSuccessful, wantStateChange: stateChangeClose},
},
},
{
name: "always success",
Expand All @@ -52,9 +54,8 @@ func TestBreaker_Observe_State(t *testing.T) {
"slidingwindow": NewSlidingWindowBreaker(10*time.Second, 0.3),
},
stages: []stages{
{calls: 100, failureFunc: alwaysSuccessful},
{calls: 100, failureFunc: alwaysSuccessful, wantStateChange: stateChangeClose},
},
wantState: stateChangeClose,
},
{
name: "always failure",
Expand All @@ -63,9 +64,8 @@ func TestBreaker_Observe_State(t *testing.T) {
"slidingwindow": NewSlidingWindowBreaker(10*time.Second, 0.9),
},
stages: []stages{
{calls: 100, failureFunc: alwaysFailure},
{calls: 100, failureFunc: alwaysFailure, wantStateChange: stateChangeOpen},
},
wantState: stateChangeOpen,
},
{
name: "start open; finish closed",
Expand All @@ -74,10 +74,9 @@ func TestBreaker_Observe_State(t *testing.T) {
// sliding window is not affected by ordering
},
stages: []stages{
{calls: 100, failureFunc: alwaysFailure},
{calls: 100, failureFunc: alwaysSuccessful},
{calls: 100, failureFunc: alwaysFailure, wantStateChange: stateChangeOpen},
{calls: 100, failureFunc: alwaysSuccessful, wantStateChange: stateChangeClose},
},
wantState: stateChangeClose,
},
{
name: "start closed; finish open",
Expand All @@ -86,32 +85,29 @@ func TestBreaker_Observe_State(t *testing.T) {
// sliding window is not affected by ordering
},
stages: []stages{
{calls: 100, failureFunc: alwaysSuccessful},
{calls: 100, failureFunc: alwaysFailure},
{calls: 100, failureFunc: alwaysSuccessful, wantStateChange: stateChangeClose},
{calls: 100, failureFunc: alwaysFailure, wantStateChange: stateChangeOpen},
},
wantState: stateChangeOpen,
},
{
name: "just above threshold opens",
breakers: map[string]Breaker{
"slidingwindow": NewSlidingWindowBreaker(10*time.Second, 0.5),
},
stages: []stages{
{calls: 100, failureFunc: alwaysSuccessful},
{calls: 101, failureFunc: alwaysFailure},
{calls: 100, failureFunc: alwaysSuccessful, wantStateChange: stateChangeClose},
{calls: 101, failureFunc: alwaysFailure, wantStateChange: stateChangeOpen},
},
wantState: stateChangeOpen,
},
{
name: "just below threshold stays closed",
breakers: map[string]Breaker{
"slidingwindow": NewSlidingWindowBreaker(10*time.Second, 0.5),
},
stages: []stages{
{calls: 101, failureFunc: alwaysSuccessful},
{calls: 100, failureFunc: alwaysFailure},
{calls: 101, failureFunc: alwaysSuccessful, wantStateChange: stateChangeClose},
{calls: 100, failureFunc: alwaysFailure, wantStateChange: stateChangeClose},
},
wantState: stateChangeClose,
},
{
name: "constant low failure rate stays mostly closed (EWMA flaky)",
Expand All @@ -120,9 +116,8 @@ func TestBreaker_Observe_State(t *testing.T) {
"slidingwindow": NewSlidingWindowBreaker(10*time.Second, 0.2),
},
stages: []stages{
{calls: 100, failureFunc: func(int) bool { return rand.Float64() < 0.1 }},
{calls: 100, failureFunc: func(int) bool { return rand.Float64() < 0.1 }, wantStateChange: stateChangeClose},
},
wantState: stateChangeClose,
},
{
name: "constant high failure rate stays mostly open (EWMA flaky)",
Expand All @@ -131,9 +126,8 @@ func TestBreaker_Observe_State(t *testing.T) {
"slidingwindow": NewSlidingWindowBreaker(10*time.Second, 0.2),
},
stages: []stages{
{calls: 100, failureFunc: func(int) bool { return rand.Float64() < 0.4 }},
{calls: 100, failureFunc: func(int) bool { return rand.Float64() < 0.4 }, wantStateChange: stateChangeOpen},
},
wantState: stateChangeOpen,
},
{
name: "single success at half-open enough to close",
Expand All @@ -142,10 +136,9 @@ func TestBreaker_Observe_State(t *testing.T) {
"slidingwindow": NewSlidingWindowBreaker(10*time.Second, 0.1),
},
stages: []stages{
{calls: 100, failureFunc: alwaysFailure},
{calls: 1, failureFunc: alwaysSuccessful, waitForHalfOpen: true},
{calls: 100, failureFunc: alwaysFailure, wantStateChange: stateChangeOpen},
{calls: 1, failureFunc: alwaysSuccessful, waitForHalfOpen: true, wantStateChange: stateChangeClose},
},
wantState: stateChangeClose,
},
{
name: "single failure at half-open keeps open",
Expand All @@ -154,10 +147,9 @@ func TestBreaker_Observe_State(t *testing.T) {
"slidingwindow": NewSlidingWindowBreaker(10*time.Second, 0.1),
},
stages: []stages{
{calls: 100, failureFunc: alwaysFailure},
{calls: 1, failureFunc: alwaysFailure, waitForHalfOpen: true},
{calls: 100, failureFunc: alwaysFailure, wantStateChange: stateChangeOpen},
{calls: 1, failureFunc: alwaysFailure, waitForHalfOpen: true, wantStateChange: stateChangeOpen},
},
wantState: stateChangeOpen,
},
{
// we want to re-open fast if we closed on a fluke (to avoid thundering herd agains a service that might be
Expand All @@ -168,11 +160,10 @@ func TestBreaker_Observe_State(t *testing.T) {
"slidingwindow": NewSlidingWindowBreaker(10*time.Second, 0.1),
},
stages: []stages{
{calls: 100, failureFunc: alwaysFailure},
{calls: 1, failureFunc: alwaysSuccessful, waitForHalfOpen: true},
{calls: 1, failureFunc: alwaysFailure},
{calls: 100, failureFunc: alwaysFailure, wantStateChange: stateChangeOpen},
{calls: 1, failureFunc: alwaysSuccessful, waitForHalfOpen: true, wantStateChange: stateChangeClose},
{calls: 1, failureFunc: alwaysFailure, wantStateChange: stateChangeOpen},
},
wantState: stateChangeOpen,
},
}
for _, tt := range tests {
Expand All @@ -182,9 +173,9 @@ func TestBreaker_Observe_State(t *testing.T) {
t.Run(bName+": "+tt.name, func(t *testing.T) {
t.Parallel()

var lastStateChange stateChange

for _, s := range tt.stages {
var lastStateChange stateChange

for i := 1; i <= s.calls; i++ {
failure := s.failureFunc(i)
switch b := b.(type) {
Expand All @@ -196,13 +187,15 @@ func TestBreaker_Observe_State(t *testing.T) {
// t.Logf("%s: sample %d: failure %v: => %v", tt.name, i, failure, b.circuit.State())
}
}

assert.Equal(t, s.wantStateChange, lastStateChange, "expected %q, got %q", s.wantStateChange, lastStateChange)
}
assert.Equal(t, tt.wantState, lastStateChange)
})
}
}
}

// ignoreNone is a small helper to skip the "none" state change and only record the last "effective" state change.
func ignoreNone(old, new stateChange) stateChange {
if new == stateChangeNone {
return old
Expand Down
28 changes: 14 additions & 14 deletions extensions/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,20 @@ func TestWithPrometheusMetrics(t *testing.T) {

durationsOut1 := `# HELP hoglet_circuit_call_durations_seconds Call durations in seconds
# TYPE hoglet_circuit_call_durations_seconds histogram
hoglet_circuit_call_durations_seconds_bucket{circuit="test",error="true",le="0.005"} 0
hoglet_circuit_call_durations_seconds_bucket{circuit="test",error="true",le="0.01"} 0
hoglet_circuit_call_durations_seconds_bucket{circuit="test",error="true",le="0.025"} 0
hoglet_circuit_call_durations_seconds_bucket{circuit="test",error="true",le="0.05"} 0
hoglet_circuit_call_durations_seconds_bucket{circuit="test",error="true",le="0.1"} 0
hoglet_circuit_call_durations_seconds_bucket{circuit="test",error="true",le="0.25"} 0
hoglet_circuit_call_durations_seconds_bucket{circuit="test",error="true",le="0.5"} 0
hoglet_circuit_call_durations_seconds_bucket{circuit="test",error="true",le="1"} 1
hoglet_circuit_call_durations_seconds_bucket{circuit="test",error="true",le="2.5"} 1
hoglet_circuit_call_durations_seconds_bucket{circuit="test",error="true",le="5"} 1
hoglet_circuit_call_durations_seconds_bucket{circuit="test",error="true",le="10"} 1
hoglet_circuit_call_durations_seconds_bucket{circuit="test",error="true",le="+Inf"} 1
hoglet_circuit_call_durations_seconds_sum{circuit="test",error="true"} 1
hoglet_circuit_call_durations_seconds_count{circuit="test",error="true"} 1
hoglet_circuit_call_durations_seconds_bucket{circuit="test",success="false",le="0.005"} 0
hoglet_circuit_call_durations_seconds_bucket{circuit="test",success="false",le="0.01"} 0
hoglet_circuit_call_durations_seconds_bucket{circuit="test",success="false",le="0.025"} 0
hoglet_circuit_call_durations_seconds_bucket{circuit="test",success="false",le="0.05"} 0
hoglet_circuit_call_durations_seconds_bucket{circuit="test",success="false",le="0.1"} 0
hoglet_circuit_call_durations_seconds_bucket{circuit="test",success="false",le="0.25"} 0
hoglet_circuit_call_durations_seconds_bucket{circuit="test",success="false",le="0.5"} 0
hoglet_circuit_call_durations_seconds_bucket{circuit="test",success="false",le="1"} 1
hoglet_circuit_call_durations_seconds_bucket{circuit="test",success="false",le="2.5"} 1
hoglet_circuit_call_durations_seconds_bucket{circuit="test",success="false",le="5"} 1
hoglet_circuit_call_durations_seconds_bucket{circuit="test",success="false",le="10"} 1
hoglet_circuit_call_durations_seconds_bucket{circuit="test",success="false",le="+Inf"} 1
hoglet_circuit_call_durations_seconds_sum{circuit="test",success="false"} 1
hoglet_circuit_call_durations_seconds_count{circuit="test",success="false"} 1
# HELP hoglet_circuit_dropped_calls_total Total number of calls with an open circuit (i.e.: calls that did not reach the wrapped function)
# TYPE hoglet_circuit_dropped_calls_total counter
hoglet_circuit_dropped_calls_total{cause="circuit_open",circuit="test"} 1
Expand Down
30 changes: 18 additions & 12 deletions hoglet.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,27 @@ func (c *Circuit[IN, OUT]) stateForCall() State {
// We reset openedAt to block further calls to pass through when half-open. A success will cause the breaker to
// close. This is slightly racy: multiple goroutines may reach this point concurrently since we do not lock the
// breaker.
c.setOpenedAt(time.Now().UnixMicro())
c.reopen()
}

return state
}

// setOpenedAt sets the time the circuit was opened at.
// Passing a value of 0 closes the cirtuit.
func (c *Circuit[IN, OUT]) setOpenedAt(i int64) {
if i == 0 {
c.openedAt.Store(0)
} else {
// CompareAndSwap is needed to avoid clobbering another goroutine's openedAt value.
c.openedAt.CompareAndSwap(0, i)
}
// open marks the circuit as open, if it not already.
// It is safe for concurrent calls and only the first one will actually set opening time.
func (c *Circuit[IN, OUT]) open() {
// CompareAndSwap is needed to avoid clobbering another goroutine's openedAt value.
c.openedAt.CompareAndSwap(0, time.Now().UnixMicro())
}

// reopen forcefully (re)marks the circuit as open, resetting the half-open time.
func (c *Circuit[IN, OUT]) reopen() {
obitech marked this conversation as resolved.
Show resolved Hide resolved
c.openedAt.Store(time.Now().UnixMicro())
}

// close closes the circuit.
func (c *Circuit[IN, OUT]) close() {
c.openedAt.Store(0)
}

// ObserverForCall returns an [Observer] for the incoming call.
Expand All @@ -176,9 +182,9 @@ func (s stateObserver[IN, OUT]) Observe(failure bool) {
case stateChangeNone:
return // noop
case stateChangeOpen:
s.circuit.setOpenedAt(time.Now().UnixMicro())
s.circuit.open()
case stateChangeClose:
s.circuit.setOpenedAt(0)
s.circuit.close()
}
}

Expand Down
24 changes: 15 additions & 9 deletions hoglet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,25 +183,31 @@ func TestHoglet_Do(t *testing.T) {
t.Parallel()

mt := &mockBreaker{}
h, err := NewCircuit(noop, mt)
h, err := NewCircuit(noop, mt, WithHalfOpenDelay(time.Minute))
require.NoError(t, err)
for i, call := range tt.calls {
if call.halfOpen {
h.setOpenedAt(int64(h.halfOpenDelay))
// simulate passage of time
h.openedAt.Store(int64(time.Now().Add(-h.halfOpenDelay).UnixMicro()))
}

var err error
maybeAssertPanic := assert.NotPanics
if call.wantPanic != nil {
maybeAssertPanic = func(t assert.TestingT, f assert.PanicTestFunc, msgAndArgs ...interface{}) bool {
return assert.PanicsWithValue(t, call.wantPanic, f, msgAndArgs...)
}
}
maybeAssertPanic(t, func() {
_, err = h.Call(context.Background(), call.arg)
})
}, call.wantPanic)
assert.Equal(t, call.wantErr, err, "unexpected error on call %d: %v", i, err)
}
})
}
}

// maybeAssertPanic is a test-table helper to assert that a function panics or not, depending on the value of wantPanic.
func maybeAssertPanic(t *testing.T, f func(), wantPanic any) {
wrapped := assert.NotPanics
if wantPanic != nil {
wrapped = func(t assert.TestingT, f assert.PanicTestFunc, msgAndArgs ...interface{}) bool {
return assert.PanicsWithValue(t, wantPanic, f, msgAndArgs...)
}
}
wrapped(t, f)
}