Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Do not merge] alternative approach to clear state when not leader anymore #75

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (r *Autoscaler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

func (a *Autoscaler) Start() {
a.LeaderElection.AfterOnStoppedLeading(a.Deployments.StopScalers)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanup desired scale and downscale when instance is not leader anymore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would break scale from 0 in the proxy which should still be possible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would break the h.Deployments.AtLeastOne(deploy) call in the proxies that are not a leader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm it won't break that since it's a different flow. Currently all stop does is to stop the scale down timer.

for range time.Tick(a.Interval) {
if !a.LeaderElection.IsLeader.Load() {
log.Println("Not leader, doing nothing")
Expand Down
10 changes: 10 additions & 0 deletions pkg/deployments/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func getModelsFromAnnotation(ann map[string]string) []string {
}

func (r *Manager) removeDeployment(req ctrl.Request) {
r.getScaler(req.Name).Stop()
r.scalersMtx.Lock()
delete(r.scalers, req.Name)
r.scalersMtx.Unlock()
Expand Down Expand Up @@ -229,6 +230,15 @@ func (r *Manager) ReadinessChecker(_ *http.Request) error {
return nil
}

// StopScalers stops all scheduled scale down processes and resets desired state
func (r *Manager) StopScalers() {
r.scalersMtx.Lock()
defer r.scalersMtx.Unlock()
for _, s := range r.scalers {
s.Stop()
}
}

func getAnnotationInt32(ann map[string]string, key string, defaultValue int32) int32 {
if ann == nil {
return defaultValue
Expand Down
72 changes: 72 additions & 0 deletions pkg/deployments/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import (
"context"
"reflect"
"testing"
"time"

"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/controller-runtime/pkg/reconcile"

appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
Expand Down Expand Up @@ -136,6 +141,73 @@ func TestAddDeployment(t *testing.T) {
}
}

func TestRemoveDeployment(t *testing.T) {
const myDeployment = "myDeployment"
specs := map[string]struct {
setup func(t *testing.T, m *Manager)
delay time.Duration
expScalers map[string]scale
}{
"single model deployment": {
setup: func(t *testing.T, m *Manager) {
m.setModelMapping("model1", myDeployment)
m.getScaler(myDeployment)
},
expScalers: map[string]scale{},
},
"multi model deployment": {
setup: func(t *testing.T, m *Manager) {
m.setModelMapping("model1", myDeployment)
m.setModelMapping("model2", myDeployment)
m.setModelMapping("other", "other")
m.getScaler(myDeployment)
m.getScaler("other")
},
expScalers: map[string]scale{"other": {Current: -1}},
},
"unknown deployment - ignored": {
setup: func(t *testing.T, m *Manager) {
m.setModelMapping("other", "other")
m.getScaler("other")
},
expScalers: map[string]scale{"other": {Current: -1}},
},
"scale down timer stopped": {
setup: func(t *testing.T, m *Manager) {
m.setModelMapping("model1", myDeployment)
s := m.getScaler(myDeployment)
s.scaleDownDelay = 50 * time.Millisecond
s.scaleFunc = func(n int32, atLeastOne bool) error {
t.Fatal("scale down timer not stopped")
return nil
}
s.UpdateState(1, 0, 1)
s.SetDesiredScale(0)
require.True(t, s.scaleDownStarted)
},
delay: 80 * time.Millisecond,
expScalers: map[string]scale{},
},
}
for name, spec := range specs {
t.Run(name, func(t *testing.T) {
m := &Manager{
scalers: make(map[string]*scaler),
modelToDeployment: make(map[string]string),
}
spec.setup(t, m)
req := reconcile.Request{NamespacedName: types.NamespacedName{Name: myDeployment}}
// when
m.removeDeployment(req)
time.Sleep(spec.delay) // let scale down timer run
// then
_, exists := m.ResolveDeployment(myDeployment)
assert.False(t, exists)
assert.Equal(t, spec.expScalers, m.getScalesSnapshot())
})
}
}

type partialFakeClient struct {
client.Client
subRes client.Object
Expand Down
32 changes: 25 additions & 7 deletions pkg/deployments/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,22 @@ func (s *scaler) compareScales(current, desired int32) {
if s.desiredScale > s.currentScale {
// Scale up immediately.
go s.scaleFunc(s.desiredScale, false)
s.scaleDownStarted = false
s.stopScaleDown()
} else if s.desiredScale == s.currentScale {
// Do nothing, schedule nothing.
if s.scaleDownTimer != nil {
s.scaleDownTimer.Stop()
}
s.scaleDownStarted = false
s.stopScaleDown()
} else {
// Schedule a scale down.

if s.scaleDownTimer == nil {
s.scaleDownTimer = time.AfterFunc(s.scaleDownDelay, func() {
if err := s.scaleFunc(s.desiredScale, false); err != nil {
s.mtx.Lock()
s.scaleDownStarted = false // mark completed already
desiredScale, currentScale := s.desiredScale, s.currentScale
s.mtx.Unlock()
if desiredScale == -1 || desiredScale == currentScale {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s.desiredScale could only be -1 right after initialization right? because there is this if earlier in scaler:

	if current != -1 {
		s.currentScale = current
	}
	if desired != -1 {
		s.desiredScale = desired
	}

asking to improve my understanding, this is not feedback to your code or a request for any changes

return
}
if err := s.scaleFunc(desiredScale, false); err != nil {
log.Printf("task: run error: %v", err)
} else {
s.scaleDownStarted = false
Expand All @@ -116,6 +119,21 @@ func (s *scaler) compareScales(current, desired int32) {
}
}

// Stop stops the scale down process for the scaler and unsets the desired scale
func (s *scaler) Stop() {
s.mtx.Lock()
defer s.mtx.Unlock()
s.stopScaleDown()
s.desiredScale = -1
}

func (s *scaler) stopScaleDown() {
if s.scaleDownTimer != nil && s.scaleDownStarted {
s.scaleDownTimer.Stop()
}
s.scaleDownStarted = false
}

type scale struct {
Current, Min, Max int32
}
Expand Down
24 changes: 22 additions & 2 deletions pkg/leader/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package leader
import (
"context"
"log"
"sync"
"sync/atomic"
"time"

Expand All @@ -25,8 +26,11 @@ func NewElection(clientset kubernetes.Interface, id, namespace string) *Election
},
}

isLeader := &atomic.Bool{}

var (
isLeader = &atomic.Bool{}
hooksMtx sync.RWMutex
hooks []func()
)
config := leaderelection.LeaderElectionConfig{
Lock: lock,
// TODO: Set to true after ensuring autoscaling is done before cancel:
Expand All @@ -42,6 +46,11 @@ func NewElection(clientset kubernetes.Interface, id, namespace string) *Election
OnStoppedLeading: func() {
log.Printf("%q stopped leading", id)
isLeader.Store(false)
hooksMtx.RLock()
defer hooksMtx.RUnlock()
for _, hook := range hooks {
hook()
}
},
OnNewLeader: func(identity string) {
if identity == id {
Expand All @@ -56,13 +65,17 @@ func NewElection(clientset kubernetes.Interface, id, namespace string) *Election
IsLeader: isLeader,
config: config,
ID: id,
hooksMtx: &hooksMtx,
hooks: &hooks,
}
}

type Election struct {
config leaderelection.LeaderElectionConfig
IsLeader *atomic.Bool
ID string
hooks *[]func()
hooksMtx *sync.RWMutex
}

func (le *Election) Start(ctx context.Context) error {
Expand All @@ -80,3 +93,10 @@ func (le *Election) Start(ctx context.Context) error {
}
}
}

func (le *Election) AfterOnStoppedLeading(f func()) {
le.hooksMtx.Lock()
defer le.hooksMtx.Unlock()

*le.hooks = append(*le.hooks, f)
}
Loading