Skip to content

Commit

Permalink
Merge pull request kubernetes#113270 from rrangith/fix/create-pvc-for…
Browse files Browse the repository at this point in the history
…-pending-pod

Automatically recreate PVC for pending STS pod

datadog:patch
  • Loading branch information
k8s-ci-robot authored and nyodas committed Apr 11, 2024
1 parent 3477ca4 commit 0c4838b
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 1 deletion.
16 changes: 16 additions & 0 deletions pkg/controller/statefulset/stateful_pod_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,22 @@ func (spc *StatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulS
}
}

// createMissingPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, and updates its retention policy
func (spc *StatefulPodControl) createMissingPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
return err
}

if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
// Set PVC policy as much as is possible at this point.
if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil {
spc.recordPodEvent("update", set, pod, err)
return err
}
}
return nil
}

// createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of
// set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method
// may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with
Expand Down
12 changes: 12 additions & 0 deletions pkg/controller/statefulset/stateful_set_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,18 @@ func (ssc *defaultStatefulSetControl) processReplica(
}
}

// If the Pod is in pending state then trigger PVC creation to create missing PVCs
if isPending(replicas[i]) {
klog.V(4).Infof(
"StatefulSet %s/%s is triggering PVC creation for pending Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
if err := ssc.podControl.createMissingPersistentVolumeClaims(set, replicas[i]); err != nil {
return true, err
}
}

// If we find a Pod that is currently terminating, we must wait until graceful deletion
// completes before we continue to make progress.
if isTerminating(replicas[i]) && monotonic {
Expand Down
40 changes: 39 additions & 1 deletion pkg/controller/statefulset/stateful_set_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func TestStatefulSetControl(t *testing.T) {
{UpdatePodFailure, simpleSetFn},
{UpdateSetStatusFailure, simpleSetFn},
{PodRecreateDeleteFailure, simpleSetFn},
{RecreatesPVCForPendingPod, simpleSetFn},
}

for _, testCase := range testCases {
Expand Down Expand Up @@ -592,7 +593,6 @@ func TestStatefulSetControlWithStartOrdinal(t *testing.T) {
func CreatesPodsWithStartOrdinal(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset(set)
om, _, ssc := setupController(client)

if err := scaleUpStatefulSetControl(set, ssc, om, invariants); err != nil {
t.Errorf("Failed to turn up StatefulSet : %s", err)
}
Expand Down Expand Up @@ -627,6 +627,44 @@ func CreatesPodsWithStartOrdinal(t *testing.T, set *apps.StatefulSet, invariants
}
}
}
func RecreatesPVCForPendingPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset()
om, _, ssc := setupController(client)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Error(err)
}
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
for _, claim := range getPersistentVolumeClaims(set, pods[0]) {
om.claimsIndexer.Delete(&claim)
}
pods[0].Status.Phase = v1.PodPending
om.podsIndexer.Update(pods[0])
if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
// invariants check if there any missing PVCs for the Pods
if err := invariants(set, om); err != nil {
t.Error(err)
}
_, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
}

func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
runTestOverPVCRetentionPolicies(
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/statefulset/stateful_set_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,11 @@ func isCreated(pod *v1.Pod) bool {
return pod.Status.Phase != ""
}

// isPending returns true if pod has a Phase of PodPending
func isPending(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodPending
}

// isFailed returns true if pod has a Phase of PodFailed
func isFailed(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodFailed
Expand Down
119 changes: 119 additions & 0 deletions test/e2e/apps/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -1347,8 +1348,126 @@ var _ = SIGDescribe("StatefulSet", func() {
framework.ExpectNoError(err)
})
})

ginkgo.Describe("Automatically recreate PVC for pending pod when PVC is missing", func() {
ssName := "ss"
labels := map[string]string{
"foo": "bar",
"baz": "blah",
}
headlessSvcName := "test"
var statefulPodMounts []v1.VolumeMount
var ss *appsv1.StatefulSet

ginkgo.BeforeEach(func(ctx context.Context) {
statefulPodMounts = []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}}
ss = e2estatefulset.NewStatefulSet(ssName, ns, headlessSvcName, 1, statefulPodMounts, nil, labels)
})

ginkgo.AfterEach(func() {
if ginkgo.CurrentGinkgoTestDescription().Failed {
e2eoutput.DumpDebugInfo(c, ns)
}
framework.Logf("Deleting all statefulset in ns %v", ns)
e2estatefulset.DeleteAllStatefulSets(c, ns)
})

ginkgo.It("PVC should be recreated when pod is pending due to missing PVC [Disruptive][Serial]", func(ctx context.Context) {
e2epv.SkipIfNoDefaultStorageClass(c)

readyNode, err := e2enode.GetRandomReadySchedulableNode(c)
framework.ExpectNoError(err)
hostLabel := "kubernetes.io/hostname"
hostLabelVal := readyNode.Labels[hostLabel]

ss.Spec.Template.Spec.NodeSelector = map[string]string{hostLabel: hostLabelVal} // force the pod on a specific node
ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns)
_, err = c.AppsV1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{})
framework.ExpectNoError(err)

ginkgo.By("Confirming PVC exists")
err = verifyStatefulSetPVCsExist(c, ss, []int{0})
framework.ExpectNoError(err)

ginkgo.By("Confirming Pod is ready")
e2estatefulset.WaitForStatusReadyReplicas(c, ss, 1)
podName := getStatefulSetPodNameAtIndex(0, ss)
pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{})
framework.ExpectNoError(err)

nodeName := pod.Spec.NodeName
framework.ExpectEqual(nodeName, readyNode.Name)
node, err := c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
framework.ExpectNoError(err)

oldData, err := json.Marshal(node)
framework.ExpectNoError(err)

node.Spec.Unschedulable = true

newData, err := json.Marshal(node)
framework.ExpectNoError(err)

// cordon node, to make sure pod does not get scheduled to the node until the pvc is deleted
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
framework.ExpectNoError(err)
ginkgo.By("Cordoning Node")
_, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
framework.ExpectNoError(err)
cordoned := true

defer func() {
if cordoned {
uncordonNode(c, oldData, newData, nodeName)
}
}()

// wait for the node to be unschedulable
e2enode.WaitForNodeSchedulable(c, nodeName, 10*time.Second, false)

ginkgo.By("Deleting Pod")
err = c.CoreV1().Pods(ns).Delete(context.TODO(), podName, metav1.DeleteOptions{})
framework.ExpectNoError(err)

// wait for the pod to be recreated
e2estatefulset.WaitForStatusCurrentReplicas(c, ss, 1)
_, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{})
framework.ExpectNoError(err)

pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: klabels.Everything().String()})
framework.ExpectNoError(err)
framework.ExpectEqual(len(pvcList.Items), 1)
pvcName := pvcList.Items[0].Name

ginkgo.By("Deleting PVC")
err = c.CoreV1().PersistentVolumeClaims(ns).Delete(context.TODO(), pvcName, metav1.DeleteOptions{})
framework.ExpectNoError(err)

uncordonNode(c, oldData, newData, nodeName)
cordoned = false

ginkgo.By("Confirming PVC recreated")
err = verifyStatefulSetPVCsExist(c, ss, []int{0})
framework.ExpectNoError(err)

ginkgo.By("Confirming Pod is ready after being recreated")
e2estatefulset.WaitForStatusReadyReplicas(c, ss, 1)
pod, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{})
framework.ExpectNoError(err)
framework.ExpectEqual(pod.Spec.NodeName, readyNode.Name) // confirm the pod was scheduled back to the original node
})
})
})

func uncordonNode(c clientset.Interface, oldData, newData []byte, nodeName string) {
ginkgo.By("Uncordoning Node")
// uncordon node, by reverting patch
revertPatchBytes, err := strategicpatch.CreateTwoWayMergePatch(newData, oldData, v1.Node{})
framework.ExpectNoError(err)
_, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, revertPatchBytes, metav1.PatchOptions{})
framework.ExpectNoError(err)
}

func kubectlExecWithRetries(ns string, args ...string) (out string) {
var err error
for i := 0; i < 3; i++ {
Expand Down
17 changes: 17 additions & 0 deletions test/e2e/framework/node/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,23 @@ func WaitForNodeToBeReady(c clientset.Interface, name string, timeout time.Durat
return WaitConditionToBe(c, name, v1.NodeReady, true, timeout)
}

func WaitForNodeSchedulable(c clientset.Interface, name string, timeout time.Duration, wantSchedulable bool) bool {
framework.Logf("Waiting up to %v for node %s to be schedulable: %t", timeout, name, wantSchedulable)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
node, err := c.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
framework.Logf("Couldn't get node %s", name)
continue
}

if IsNodeSchedulable(node) == wantSchedulable {
return true
}
}
framework.Logf("Node %s didn't reach desired schedulable status (%t) within %v", name, wantSchedulable, timeout)
return false
}

// CheckReady waits up to timeout for cluster to has desired size and
// there is no not-ready nodes in it. By cluster size we mean number of schedulable Nodes.
func CheckReady(c clientset.Interface, size int, timeout time.Duration) ([]v1.Node, error) {
Expand Down
25 changes: 25 additions & 0 deletions test/e2e/framework/statefulset/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,31 @@ func WaitForStatusReplicas(c clientset.Interface, ss *appsv1.StatefulSet, expect
}
}

// WaitForStatusCurrentReplicas waits for the ss.Status.CurrentReplicas to be equal to expectedReplicas
func WaitForStatusCurrentReplicas(c clientset.Interface, ss *appsv1.StatefulSet, expectedReplicas int32) {
framework.Logf("Waiting for statefulset status.currentReplicas updated to %d", expectedReplicas)

ns, name := ss.Namespace, ss.Name
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
func() (bool, error) {
ssGet, err := c.AppsV1().StatefulSets(ns).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return false, err
}
if ssGet.Status.ObservedGeneration < ss.Generation {
return false, nil
}
if ssGet.Status.CurrentReplicas != expectedReplicas {
framework.Logf("Waiting for stateful set status.currentReplicas to become %d, currently %d", expectedReplicas, ssGet.Status.CurrentReplicas)
return false, nil
}
return true, nil
})
if pollErr != nil {
framework.Failf("Failed waiting for stateful set status.currentReplicas updated to %d: %v", expectedReplicas, pollErr)
}
}

// Saturate waits for all Pods in ss to become Running and Ready.
func Saturate(c clientset.Interface, ss *appsv1.StatefulSet) {
var i int32
Expand Down

0 comments on commit 0c4838b

Please sign in to comment.