From c6cac7c616eb7809c99e0d1501d39ddebeea8fec Mon Sep 17 00:00:00 2001 From: Kubernetes Prow Robot Date: Fri, 3 Mar 2023 10:24:58 -0800 Subject: [PATCH] Merge pull request #113270 from rrangith/fix/create-pvc-for-pending-pod Automatically recreate PVC for pending STS pod datadog:patch --- .../statefulset/stateful_pod_control.go | 16 +++ .../statefulset/stateful_set_control.go | 12 ++ .../statefulset/stateful_set_control_test.go | 40 ++++++ .../statefulset/stateful_set_utils.go | 5 + test/e2e/apps/statefulset.go | 119 ++++++++++++++++++ test/e2e/framework/node/wait.go | 17 +++ test/e2e/framework/statefulset/wait.go | 25 ++++ 7 files changed, 234 insertions(+) diff --git a/pkg/controller/statefulset/stateful_pod_control.go b/pkg/controller/statefulset/stateful_pod_control.go index 0d045763e33fb..68a2c7fb83c46 100644 --- a/pkg/controller/statefulset/stateful_pod_control.go +++ b/pkg/controller/statefulset/stateful_pod_control.go @@ -314,6 +314,22 @@ func (spc *StatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulS } } +// createMissingPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, and updates its retention policy +func (spc *StatefulPodControl) createMissingPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { + if err := spc.createPersistentVolumeClaims(set, pod); err != nil { + return err + } + + if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { + // Set PVC policy as much as is possible at this point. + if err := spc.UpdatePodClaimForRetentionPolicy(set, pod); err != nil { + spc.recordPodEvent("update", set, pod, err) + return err + } + } + return nil +} + // createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of // set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method // may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index 6931bec85e357..a91a8ae60095f 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -452,6 +452,18 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // pod created, no more work possible for this round continue } + + // If the Pod is in pending state then trigger PVC creation to create missing PVCs + if isPending(replicas[i]) { + klog.V(4).Infof( + "StatefulSet %s/%s is triggering PVC creation for pending Pod %s", + set.Namespace, + set.Name, + replicas[i].Name) + if err := ssc.podControl.createMissingPersistentVolumeClaims(set, replicas[i]); err != nil { + return &status, err + } + } // If we find a Pod that is currently terminating, we must wait until graceful deletion // completes before we continue to make progress. if isTerminating(replicas[i]) && monotonic { diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 631deb51d98eb..aaaf8afd11ff1 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -157,6 +157,7 @@ func TestStatefulSetControl(t *testing.T) { {UpdatePodFailure, simpleSetFn}, {UpdateSetStatusFailure, simpleSetFn}, {PodRecreateDeleteFailure, simpleSetFn}, + {RecreatesPVCForPendingPod, simpleSetFn}, } for _, testCase := range testCases { @@ -552,6 +553,45 @@ func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants in } } +func RecreatesPVCForPendingPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { + client := fake.NewSimpleClientset() + om, _, ssc, _ := setupController(client) + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Error(err) + } + pods, err := om.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Error(err) + } + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + t.Errorf("Error updating StatefulSet %s", err) + } + if err := invariants(set, om); err != nil { + t.Error(err) + } + pods, err = om.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Error(err) + } + for _, claim := range getPersistentVolumeClaims(set, pods[0]) { + om.claimsIndexer.Delete(&claim) + } + pods[0].Status.Phase = v1.PodPending + om.podsIndexer.Update(pods[0]) + if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + t.Errorf("Error updating StatefulSet %s", err) + } + // invariants check if there any missing PVCs for the Pods + if err := invariants(set, om); err != nil { + t.Error(err) + } + _, err = om.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Error(err) + } +} + func TestStatefulSetControlScaleDownDeleteError(t *testing.T) { runTestOverPVCRetentionPolicies( t, "", func(t *testing.T, policy *apps.StatefulSetPersistentVolumeClaimRetentionPolicy) { diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index 3ae194bf052b3..0b77f1561c2af 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -387,6 +387,11 @@ func isCreated(pod *v1.Pod) bool { return pod.Status.Phase != "" } +// isPending returns true if pod has a Phase of PodPending +func isPending(pod *v1.Pod) bool { + return pod.Status.Phase == v1.PodPending +} + // isFailed returns true if pod has a Phase of PodFailed func isFailed(pod *v1.Pod) bool { return pod.Status.Phase == v1.PodFailed diff --git a/test/e2e/apps/statefulset.go b/test/e2e/apps/statefulset.go index 6a347e171b179..7bbeb0bf8cfb3 100644 --- a/test/e2e/apps/statefulset.go +++ b/test/e2e/apps/statefulset.go @@ -38,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" @@ -1343,8 +1344,126 @@ var _ = SIGDescribe("StatefulSet", func() { framework.ExpectNoError(err) }) }) + + ginkgo.Describe("Automatically recreate PVC for pending pod when PVC is missing", func() { + ssName := "ss" + labels := map[string]string{ + "foo": "bar", + "baz": "blah", + } + headlessSvcName := "test" + var statefulPodMounts []v1.VolumeMount + var ss *appsv1.StatefulSet + + ginkgo.BeforeEach(func(ctx context.Context) { + statefulPodMounts = []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}} + ss = e2estatefulset.NewStatefulSet(ssName, ns, headlessSvcName, 1, statefulPodMounts, nil, labels) + }) + + ginkgo.AfterEach(func() { + if ginkgo.CurrentGinkgoTestDescription().Failed { + framework.DumpDebugInfo(c, ns) + } + framework.Logf("Deleting all statefulset in ns %v", ns) + e2estatefulset.DeleteAllStatefulSets(c, ns) + }) + + ginkgo.It("PVC should be recreated when pod is pending due to missing PVC [Disruptive][Serial]", func(ctx context.Context) { + e2epv.SkipIfNoDefaultStorageClass(c) + + readyNode, err := e2enode.GetRandomReadySchedulableNode(c) + framework.ExpectNoError(err) + hostLabel := "kubernetes.io/hostname" + hostLabelVal := readyNode.Labels[hostLabel] + + ss.Spec.Template.Spec.NodeSelector = map[string]string{hostLabel: hostLabelVal} // force the pod on a specific node + ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns) + _, err = c.AppsV1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Confirming PVC exists") + err = verifyStatefulSetPVCsExist(c, ss, []int{0}) + framework.ExpectNoError(err) + + ginkgo.By("Confirming Pod is ready") + e2estatefulset.WaitForStatusReadyReplicas(c, ss, 1) + podName := getStatefulSetPodNameAtIndex(0, ss) + pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + nodeName := pod.Spec.NodeName + framework.ExpectEqual(nodeName, readyNode.Name) + node, err := c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + oldData, err := json.Marshal(node) + framework.ExpectNoError(err) + + node.Spec.Unschedulable = true + + newData, err := json.Marshal(node) + framework.ExpectNoError(err) + + // cordon node, to make sure pod does not get scheduled to the node until the pvc is deleted + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{}) + framework.ExpectNoError(err) + ginkgo.By("Cordoning Node") + _, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + framework.ExpectNoError(err) + cordoned := true + + defer func() { + if cordoned { + uncordonNode(c, oldData, newData, nodeName) + } + }() + + // wait for the node to be unschedulable + e2enode.WaitForNodeSchedulable(c, nodeName, 10*time.Second, false) + + ginkgo.By("Deleting Pod") + err = c.CoreV1().Pods(ns).Delete(context.TODO(), podName, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // wait for the pod to be recreated + e2estatefulset.WaitForStatusCurrentReplicas(c, ss, 1) + _, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: klabels.Everything().String()}) + framework.ExpectNoError(err) + framework.ExpectEqual(len(pvcList.Items), 1) + pvcName := pvcList.Items[0].Name + + ginkgo.By("Deleting PVC") + err = c.CoreV1().PersistentVolumeClaims(ns).Delete(context.TODO(), pvcName, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + uncordonNode(c, oldData, newData, nodeName) + cordoned = false + + ginkgo.By("Confirming PVC recreated") + err = verifyStatefulSetPVCsExist(c, ss, []int{0}) + framework.ExpectNoError(err) + + ginkgo.By("Confirming Pod is ready after being recreated") + e2estatefulset.WaitForStatusReadyReplicas(c, ss, 1) + pod, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) + framework.ExpectNoError(err) + framework.ExpectEqual(pod.Spec.NodeName, readyNode.Name) // confirm the pod was scheduled back to the original node + }) + }) }) +func uncordonNode(c clientset.Interface, oldData, newData []byte, nodeName string) { + ginkgo.By("Uncordoning Node") + // uncordon node, by reverting patch + revertPatchBytes, err := strategicpatch.CreateTwoWayMergePatch(newData, oldData, v1.Node{}) + framework.ExpectNoError(err) + _, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, revertPatchBytes, metav1.PatchOptions{}) + framework.ExpectNoError(err) +} + func kubectlExecWithRetries(ns string, args ...string) (out string) { var err error for i := 0; i < 3; i++ { diff --git a/test/e2e/framework/node/wait.go b/test/e2e/framework/node/wait.go index c903ab0779c86..6ded5457103b6 100644 --- a/test/e2e/framework/node/wait.go +++ b/test/e2e/framework/node/wait.go @@ -143,6 +143,23 @@ func WaitForNodeToBeReady(c clientset.Interface, name string, timeout time.Durat return WaitConditionToBe(c, name, v1.NodeReady, true, timeout) } +func WaitForNodeSchedulable(c clientset.Interface, name string, timeout time.Duration, wantSchedulable bool) bool { + e2elog.Logf("Waiting up to %v for node %s to be schedulable: %t", timeout, name, wantSchedulable) + for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) { + node, err := c.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + e2elog.Logf("Couldn't get node %s", name) + continue + } + + if IsNodeSchedulable(node) == wantSchedulable { + return true + } + } + e2elog.Logf("Node %s didn't reach desired schedulable status (%t) within %v", name, wantSchedulable, timeout) + return false +} + // CheckReady waits up to timeout for cluster to has desired size and // there is no not-ready nodes in it. By cluster size we mean number of schedulable Nodes. func CheckReady(c clientset.Interface, size int, timeout time.Duration) ([]v1.Node, error) { diff --git a/test/e2e/framework/statefulset/wait.go b/test/e2e/framework/statefulset/wait.go index c78f2fdcfec3b..2dee960593edf 100644 --- a/test/e2e/framework/statefulset/wait.go +++ b/test/e2e/framework/statefulset/wait.go @@ -171,6 +171,31 @@ func WaitForStatusReplicas(c clientset.Interface, ss *appsv1.StatefulSet, expect } } +// WaitForStatusCurrentReplicas waits for the ss.Status.CurrentReplicas to be equal to expectedReplicas +func WaitForStatusCurrentReplicas(c clientset.Interface, ss *appsv1.StatefulSet, expectedReplicas int32) { + framework.Logf("Waiting for statefulset status.currentReplicas updated to %d", expectedReplicas) + + ns, name := ss.Namespace, ss.Name + pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, + func() (bool, error) { + ssGet, err := c.AppsV1().StatefulSets(ns).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if ssGet.Status.ObservedGeneration < ss.Generation { + return false, nil + } + if ssGet.Status.CurrentReplicas != expectedReplicas { + framework.Logf("Waiting for stateful set status.currentReplicas to become %d, currently %d", expectedReplicas, ssGet.Status.CurrentReplicas) + return false, nil + } + return true, nil + }) + if pollErr != nil { + framework.Failf("Failed waiting for stateful set status.currentReplicas updated to %d: %v", expectedReplicas, pollErr) + } +} + // Saturate waits for all Pods in ss to become Running and Ready. func Saturate(c clientset.Interface, ss *appsv1.StatefulSet) { var i int32