Skip to content

Commit

Permalink
Fix scale back to 0 with 300 requests scenario (#76)
Browse files Browse the repository at this point in the history
Fixes #73 and includes pr #70 and others from Alex

* Remove compareScales function
* Refactor SetDesiredScale that is simpler to understand by storing
lastSuccessfulScale time
* Improve test coverage for scaler.go

Co-authored-by: Alex Peters <[email protected]>
  • Loading branch information
samos123 and alpe authored Feb 9, 2024
1 parent 6f7cd86 commit 5227052
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 110 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ jobs:
replicas: ["1", "3"]
test_cases:
- { requests: 60, expected_replicas: 1 }
# remove broken test, put this back when scaling issues are solved
# - { requests: 300, expected_replicas: 2 }
- { requests: 300, expected_replicas: 2 }
runs-on: ubuntu-latest
name: E2E Lingo.replicas=${{ matrix.replicas }} requests=${{ matrix.test_cases.requests }} expected_replicas=${{ matrix.test_cases.expected_replicas }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ test: test-unit test-race test-integration test-e2e

.PHONY: test-unit
test-unit:
go test -mod=readonly -race ./pkg/...
go test -timeout=5m -mod=readonly -race ./pkg/...

.PHONY: test-integration
test-integration: envtest
Expand Down
14 changes: 14 additions & 0 deletions pkg/deployments/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (r *Manager) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result,

func (r *Manager) addDeployment(ctx context.Context, d appsv1.Deployment) error {
models := getModelsFromAnnotation(d.GetAnnotations())
log.Printf("deployment: %v models: %v", d.Name, models)
if len(models) == 0 {
return nil
}
Expand All @@ -94,6 +95,7 @@ func (r *Manager) addDeployment(ctx context.Context, d appsv1.Deployment) error
}

deploymentName := d.Name

r.getScaler(deploymentName).UpdateState(
scale.Spec.Replicas,
getAnnotationInt32(d.GetAnnotations(), lingoDomain+"/min-replicas", 0),
Expand All @@ -103,6 +105,14 @@ func (r *Manager) addDeployment(ctx context.Context, d appsv1.Deployment) error
return nil
}

func (r *Manager) hasModel(d *appsv1.Deployment) bool {
models := getModelsFromAnnotation(d.GetAnnotations())
if len(models) == 0 {
return false
}
return true
}

func getModelsFromAnnotation(ann map[string]string) []string {
if len(ann) == 0 {
return []string{}
Expand Down Expand Up @@ -165,6 +175,10 @@ func (r *Manager) scaleFunc(ctx context.Context, deploymentName string) func(int
return fmt.Errorf("get: %w", err)
}

if !r.hasModel(&d) {
return fmt.Errorf("not scaling deployment %q: deployment has no models annotation", deploymentName)
}

var scale autoscalingv1.Scale
if err := r.SubResource("scale").Get(ctx, &d, &scale); err != nil {
return fmt.Errorf("get scale: %w", err)
Expand Down
53 changes: 53 additions & 0 deletions pkg/deployments/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"reflect"
"testing"

"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/controller-runtime/pkg/reconcile"

appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -136,6 +140,55 @@ func TestAddDeployment(t *testing.T) {
}
}

func TestRemoveDeployment(t *testing.T) {
const myDeployment = "myDeployment"
specs := map[string]struct {
setup func(t *testing.T, m *Manager)
expScalers map[string]scale
}{
"single model deployment": {
setup: func(t *testing.T, m *Manager) {
m.setModelMapping("model1", myDeployment)
m.getScaler(myDeployment)
},
expScalers: map[string]scale{},
},
"multi model deployment": {
setup: func(t *testing.T, m *Manager) {
m.setModelMapping("model1", myDeployment)
m.setModelMapping("model2", myDeployment)
m.setModelMapping("other", "other")
m.getScaler(myDeployment)
m.getScaler("other")
},
expScalers: map[string]scale{"other": {Current: -1}},
},
"unknown deployment - ignored": {
setup: func(t *testing.T, m *Manager) {
m.setModelMapping("other", "other")
m.getScaler("other")
},
expScalers: map[string]scale{"other": {Current: -1}},
},
}
for name, spec := range specs {
t.Run(name, func(t *testing.T) {
m := &Manager{
scalers: make(map[string]*scaler),
modelToDeployment: make(map[string]string),
}
spec.setup(t, m)
req := reconcile.Request{NamespacedName: types.NamespacedName{Name: myDeployment}}
// when
m.removeDeployment(req)
// then
_, exists := m.ResolveDeployment(myDeployment)
assert.False(t, exists)
assert.Equal(t, spec.expScalers, m.getScalesSnapshot())
})
}
}

type partialFakeClient struct {
client.Client
subRes client.Object
Expand Down
135 changes: 53 additions & 82 deletions pkg/deployments/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,44 @@ type scaler struct {
minScale int32
maxScale int32

// The time that scaler decided to scale down which gets delayed by scaleDownDelay
desiredScaleDownTime time.Time

// scaleFuncMtx ensures the scale function is not run concurrently.
scaleFuncMtx sync.Mutex
scaleFunc func(n int32, atLeastOne bool) error
scaleFunc func(n int32, atLeastOne bool)

scaleDownDelay time.Duration
scaleDownStarted bool
scaleDownTimer *time.Timer
scaleDownDelay time.Duration
}

func newScaler(scaleDownDelay time.Duration, scaleFunc func(int32, bool) error) *scaler {
s := &scaler{
// -1 represents unknown
currentScale: -1,
desiredScale: -1,
scaleDownDelay: scaleDownDelay,
}

// do error handling by logging here and do not return error
s.scaleFunc = func(n int32, atLeastOne bool) {
s.scaleFuncMtx.Lock()
err := scaleFunc(n, atLeastOne)
s.scaleFuncMtx.Unlock()

if err != nil {
log.Printf("error scaling: %+v", err)
}
}

return s
}

// AtLeastOne schedules a scale up if the current scale is zero.
func (s *scaler) AtLeastOne() {
s.mtx.Lock()
defer s.mtx.Unlock()
log.Printf("AtLeastOne()")
if err := s.scaleFunc(-1, true); err != nil {
log.Printf("scale error: %v", err)
}
s.scaleFunc(-1, true)
}

// UpdateState updates the current state of the scaler
Expand All @@ -46,15 +67,36 @@ func (s *scaler) UpdateState(replicas, min, max int32) {
// SetDesiredScale sets the desired scale of the scaler and scales
// if needed.
func (s *scaler) SetDesiredScale(n int32) {
log.Printf("SetDesiredScale(%v)", n)
s.compareScales(-1, s.applyMinMax(n))
log.Printf("SetDesiredScale(%v), current: %v, min: %v, max: %v", n, s.currentScale, s.minScale, s.maxScale)
s.mtx.Lock()
defer s.mtx.Unlock()
s.desiredScale = s.applyMinMax(n)

if s.desiredScale > s.currentScale {
// Scale up immediately.
log.Printf("Scaling up to %v immediately", s.desiredScale)
s.scaleFunc(s.desiredScale, false)
s.desiredScaleDownTime = time.Time{}
} else if s.desiredScale == s.currentScale {
log.Printf("Desired scale %v equals current scale %v, doing nothing", s.desiredScale, s.currentScale)
s.desiredScaleDownTime = time.Time{}
} else {
if s.desiredScaleDownTime.IsZero() {
s.desiredScaleDownTime = time.Now()
expectedNextScaleDown := s.desiredScaleDownTime.Add(s.scaleDownDelay)
log.Printf("Delaying scale down to happen on or after %v", expectedNextScaleDown)
} else if time.Since(s.desiredScaleDownTime) >= s.scaleDownDelay {
log.Printf("Scaling down to %v immediately", s.desiredScale)
s.scaleFunc(s.desiredScale, false)
}
}
}

// applyMinMax applies the min and max scale to the given number
// function needs to be called within the locked scaler.mtx
func (s *scaler) applyMinMax(n int32) int32 {
s.mtx.Lock()
min := s.minScale
max := s.maxScale
s.mtx.Unlock()
if n < min {
n = min
} else if n > max {
Expand All @@ -63,54 +105,6 @@ func (s *scaler) applyMinMax(n int32) int32 {
return n
}

func (s *scaler) compareScales(current, desired int32) {
s.mtx.Lock()
defer s.mtx.Unlock()

if current != -1 {
s.currentScale = current
}
if desired != -1 {
s.desiredScale = desired
}

if s.currentScale == -1 || s.desiredScale == -1 {
// Nothing to compare if we only have partial information
return
}

log.Printf("Comparing scales, current: %v, desired: %v", s.currentScale, s.desiredScale)

if s.desiredScale > s.currentScale {
// Scale up immediately.
go s.scaleFunc(s.desiredScale, false)
s.scaleDownStarted = false
} else if s.desiredScale == s.currentScale {
// Do nothing, schedule nothing.
if s.scaleDownTimer != nil {
s.scaleDownTimer.Stop()
}
s.scaleDownStarted = false
} else {
// Schedule a scale down.

if s.scaleDownTimer == nil {
s.scaleDownTimer = time.AfterFunc(s.scaleDownDelay, func() {
if err := s.scaleFunc(s.desiredScale, false); err != nil {
log.Printf("task: run error: %v", err)
} else {
s.scaleDownStarted = false
s.compareScales(s.desiredScale, -1)
}
})
} else if !s.scaleDownStarted {
s.scaleDownTimer.Reset(s.scaleDownDelay)
}

s.scaleDownStarted = true
}
}

type scale struct {
Current, Min, Max int32
}
Expand All @@ -120,26 +114,3 @@ func (s *scaler) getScale() scale {
defer s.mtx.Unlock()
return scale{Current: s.currentScale, Min: s.minScale, Max: s.maxScale}
}

func newScaler(scaleDownDelay time.Duration, scaleFunc func(int32, bool) error) *scaler {
s := &scaler{
// -1 represents unknown
currentScale: -1,
desiredScale: -1,
scaleDownDelay: scaleDownDelay,
}

s.scaleFunc = func(n int32, atLeastOne bool) error {
s.scaleFuncMtx.Lock()
err := scaleFunc(n, atLeastOne)
s.scaleFuncMtx.Unlock()

if err != nil {
return err
}

return nil
}

return s
}
Loading

0 comments on commit 5227052

Please sign in to comment.