Skip to content

Commit

Permalink
Set min/max replicas from annotation
Browse files Browse the repository at this point in the history
  • Loading branch information
nstogner committed Nov 1, 2023
1 parent 021fb98 commit 56b207f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 6 deletions.
29 changes: 27 additions & 2 deletions deployment_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -16,6 +17,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const lingoDomain = "lingo.substratus.ai"

func NewDeploymentManager(mgr ctrl.Manager) (*DeploymentManager, error) {
r := &DeploymentManager{}
r.Client = mgr.GetClient()
Expand Down Expand Up @@ -62,7 +65,7 @@ func (r *DeploymentManager) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

if ann := d.GetAnnotations(); ann != nil {
modelCSV, ok := ann["lingo-models"]
modelCSV, ok := ann[lingoDomain+"/models"]
if !ok {
return ctrl.Result{}, nil
}
Expand All @@ -81,7 +84,11 @@ func (r *DeploymentManager) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

model := req.Name
r.getScaler(model).SetCurrentScale(scale.Spec.Replicas)
r.getScaler(model).UpdateState(
scale.Spec.Replicas,
getAnnotationInt32(d.GetAnnotations(), lingoDomain+"/min-replicas", 0),
getAnnotationInt32(d.GetAnnotations(), lingoDomain+"/max-replicas", 3),
)

return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -136,3 +143,21 @@ func (r *DeploymentManager) ResolveDeployment(model string) (string, bool) {
r.modelToDeploymentMtx.RUnlock()
return deploy, ok
}

func getAnnotationInt32(ann map[string]string, key string, defaultValue int32) int32 {
if ann == nil {
return defaultValue
}

str, ok := ann[key]
if !ok {
return defaultValue
}

value, err := strconv.Atoi(str)
if err != nil {
log.Printf("parsing annotation as int: %v", err)
}

return int32(value)
}
31 changes: 27 additions & 4 deletions scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type scaler struct {
mtx sync.Mutex
currentScale int32
desiredScale int32
minScale int32
maxScale int32

// scaleFuncMtx ensures the scale function is not run concurrently.
scaleFuncMtx sync.Mutex
Expand All @@ -26,14 +28,35 @@ func (s *scaler) AtLeastOne() {
s.compareScales(-1, -1, true)
}

func (s *scaler) SetCurrentScale(n int32) {
log.Printf("SetCurrentScale(%v)", n)
s.compareScales(n, -1, false)
func (s *scaler) UpdateState(replicas, min, max int32) {
log.Printf("UpdateState(%v, %v, %v)", replicas, min, max)
s.setMinMax(min, max)
s.compareScales(replicas, -1, false)
}

func (s *scaler) SetDesiredScale(n int32) {
log.Printf("SetDesiredScale(%v)", n)
s.compareScales(-1, n, false)
s.compareScales(-1, s.applyMinMax(n), false)
}

func (s *scaler) setMinMax(min, max int32) {
s.mtx.Lock()
s.minScale = min
s.maxScale = max
s.mtx.Unlock()
}

func (s *scaler) applyMinMax(n int32) int32 {
s.mtx.Lock()
min := s.minScale
max := s.maxScale
s.mtx.Unlock()
if n < min {
n = min
} else if n > max {
n = max
}
return n
}

func (s *scaler) compareScales(current, desired int32, zeroToOne bool) {
Expand Down

0 comments on commit 56b207f

Please sign in to comment.