Skip to content

Commit

Permalink
fix scheduler bug (#447)
Browse files Browse the repository at this point in the history
  • Loading branch information
MegaByte875 authored Feb 21, 2024
1 parent fb68813 commit 64e11eb
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 57 deletions.
121 changes: 64 additions & 57 deletions pkg/scheduler/nodezone/node_zone.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
corelisters "k8s.io/client-go/listers/core/v1"
Expand Down Expand Up @@ -57,7 +56,7 @@ const (

// preFilterStateKey is the key in CycleState to NodeZone pre-computed data.
// Using the name of the plugin will likely help us avoid collisions with other plugins.
preFilterStateKey = "PreFilter" + Name
preFilterStateKey framework.StateKey = "PreFilter" + Name

// ErrReasonNoLabelTopologyZone is used for predicate error.
ErrReasonNoLabelTopologyZone = "node(s) no topology zone label"
Expand Down Expand Up @@ -179,36 +178,6 @@ func (pl *NodeZone) getTopologyZones() ([]string, error) {
return sortedZones, nil
}

// activateSiblings stashes the pods belonging to the same workload of the given pod
// in the given state, with a reserved key "kubernetes.io/pods-to-activate".
func (pl *NodeZone) activateSiblings(pod *corev1.Pod, state *framework.CycleState) {
pods, err := pl.podLister.Pods(pod.Namespace).List(labels.SelectorFromSet(pod.GetLabels()))
if err != nil {
klog.ErrorS(err, "failed to list pods belong to a workload: %v", err)
return
}

for i := range pods {
if pods[i].UID == pod.UID {
pods = append(pods[:i], pods[i+1:]...)
break
}
}

if len(pods) != 0 {
if c, err := state.Read(framework.PodsToActivateKey); err == nil {
if s, ok := c.(*framework.PodsToActivate); ok {
s.Lock()
for _, pod := range pods {
namespacedName := getNamespacedName(pod)
s.Map[namespacedName] = pod
}
s.Unlock()
}
}
}
}

// Filter invoked at the filter extension point.
func (pl *NodeZone) Filter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
if len(pod.Spec.TopologySpreadConstraints) == 0 {
Expand Down Expand Up @@ -246,15 +215,15 @@ func (pl *NodeZone) Filter(ctx context.Context, cycleState *framework.CycleState
zoneIndex[zone] = index
}

klog.V(5).Infof("available zones: %v", zones)
klog.V(5).Infof("Available topology zones: %v", zones)

parentName, ordinal := getParentNameAndOrdinal(pod)
m := ordinal % AvailableZones
if m == 0 {
remainder := ordinal % AvailableZones
if remainder == 0 {
return nil
}

anchorOrdinal := ordinal - m
anchorOrdinal := ordinal - remainder
anchorName := getPodNameByOrdinal(parentName, anchorOrdinal)
anchorPod, err := pl.getPod(anchorName, pod.Namespace)
if err != nil {
Expand All @@ -263,7 +232,7 @@ func (pl *NodeZone) Filter(ctx context.Context, cycleState *framework.CycleState
}
}

if anchorPod != nil && anchorPod.Spec.NodeName != "" {
if isPodScheduled(anchorPod) {
anchorNode, err := pl.getNode(anchorPod.Spec.NodeName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return s
Expand All @@ -273,10 +242,38 @@ func (pl *NodeZone) Filter(ctx context.Context, cycleState *framework.CycleState
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNoLabelTopologyZone)
}
shift := zoneIndex[anchorZone]
klog.V(5).Infof("anchor pod %s zone shift %d", anchorName, shift)
idealZone := zones[(shift+m)%AvailableZones]
klog.V(5).Infof("Anchor pod %s zone %s shift %d", anchorName, anchorZone, shift)
idealZone := zones[(shift+remainder)%AvailableZones]
if idealZone != nodeZone {
klog.V(5).Infof("pod [%s/%s] not fit node %s in zone %s, ideal zone %s", pod.Namespace, pod.Name, nodeInfo.Node().Name, nodeZone, idealZone)
klog.V(5).Infof("Pod [%s/%s] not fit node %s in zone %s, ideal zone %s", pod.Namespace, pod.Name, nodeInfo.Node().Name, nodeZone, idealZone)
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNotMatch)
}
}

var siblingOrdinal int
if remainder == 1 {
siblingOrdinal = ordinal + 1
} else if remainder == 2 {
siblingOrdinal = ordinal - 1
}
siblingName := getPodNameByOrdinal(parentName, siblingOrdinal)
siblingPod, err := pl.getPod(siblingName, pod.Namespace)
if err != nil {
if !apierrors.IsNotFound(err) {
return framework.AsStatus(err)
}
}
if isPodScheduled(siblingPod) {
siblingNode, err := pl.getNode(siblingPod.Spec.NodeName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return s
}
siblingZone, ok := siblingNode.GetLabels()[corev1.LabelTopologyZone]
if !ok {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNoLabelTopologyZone)
}
if siblingZone == nodeZone {
klog.V(5).Infof("Sibling pod [%s/%s] exists in zone %s", pod.Namespace, siblingPod.Name, siblingZone)
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonNotMatch)
}
}
Expand All @@ -285,37 +282,45 @@ func (pl *NodeZone) Filter(ctx context.Context, cycleState *framework.CycleState
}

// Permit is the functions invoked by the framework at "Permit" extension point.
func (pl *NodeZone) Permit(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) (*framework.Status, time.Duration) {
func (pl *NodeZone) Permit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) (*framework.Status, time.Duration) {
if !needSchedule(pod.Name) {
return framework.NewStatus(framework.Success), 0
}

parentName, ordinal := getParentNameAndOrdinal(pod)
m := ordinal % AvailableZones
if m == 0 {
remainder := ordinal % AvailableZones
if remainder == 0 {
return framework.NewStatus(framework.Success), 0
}

anchorOrdinal := ordinal - m
anchorOrdinal := ordinal - remainder
anchorName := getPodNameByOrdinal(parentName, anchorOrdinal)
anchorPod, err := pl.getPod(anchorName, pod.Namespace)
if err != nil {
if !apierrors.IsNotFound(err) {
return framework.AsStatus(err), 0
}
}
if anchorPod == nil || anchorPod.Spec.NodeName == "" {
// We will also request to move the sibling pods back to activeQ.
pl.activateSiblings(pod, state)
if !isPodScheduled(anchorPod) {
klog.InfoS("Pod is waiting to be scheduled to node", "pod", klog.KObj(pod), "nodeName", nodeName)
return framework.NewStatus(framework.Wait), WaitTime
}
pl.handler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
wParentName, wOrdinal := getParentNameAndOrdinal(waitingPod.GetPod())
if wParentName == parentName && ordinal == wOrdinal {
klog.V(3).InfoS("Permit allows", "pod", klog.KObj(waitingPod.GetPod()))
waitingPod.Allow(pl.Name())
}
})

anchorNode, err := pl.getNode(anchorPod.Spec.NodeName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return s, 0
}
anchorZone := anchorNode.GetLabels()[corev1.LabelTopologyZone]
assumedNode, err := pl.getNode(nodeName)
if s := getErrorAsStatus(err); !s.IsSuccess() {
return s, 0
}
assumedZone := assumedNode.GetLabels()[corev1.LabelTopologyZone]
if anchorZone == assumedZone {
klog.V(5).Infof("Zone conflict, anchor pod [%s/%s] exists in zone %s", pod.Namespace, anchorName, anchorZone)
return framework.NewStatus(framework.Unschedulable, ErrReasonNotMatch), 0
}

klog.V(3).InfoS("Permit allows", "pod", klog.KObj(pod))
return framework.NewStatus(framework.Success), 0
}
Expand All @@ -328,14 +333,16 @@ func (pl *NodeZone) Reserve(ctx context.Context, state *framework.CycleState, po
// Unreserve rejects all other adjacent Pods times out.
func (pl *NodeZone) Unreserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) {
parentName, ordinal := getParentNameAndOrdinal(pod)
m := ordinal % AvailableZones
if m == 0 {
remainder := ordinal % AvailableZones
if remainder == 0 {
return
}

quotient := ordinal / AvailableZones
pl.handler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
wParentName, wOrdinal := getParentNameAndOrdinal(waitingPod.GetPod())
if wParentName == parentName && ordinal == wOrdinal {
wQuotient := wOrdinal / AvailableZones
if waitingPod.GetPod().Namespace == pod.Namespace && wParentName == parentName && quotient == wQuotient {
klog.V(3).InfoS("Unreserve rejects", "pod", klog.KObj(waitingPod.GetPod()))
waitingPod.Reject(pl.Name(), "rejection in Unreserve")
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/nodezone/node_zone_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,7 @@ func getPodNameByOrdinal(parentName string, ordinal int) string {
func getNamespacedName(obj metav1.Object) string {
return fmt.Sprintf("%v/%v", obj.GetNamespace(), obj.GetName())
}

func isPodScheduled(pod *corev1.Pod) bool {
return pod != nil && pod.DeletionTimestamp == nil && pod.Spec.NodeName != ""
}

0 comments on commit 64e11eb

Please sign in to comment.