From 5c0bc07e2296ac6e7ba040b0d6b36545cbf33a2b Mon Sep 17 00:00:00 2001 From: bob Date: Fri, 20 Jan 2023 18:43:31 +0100 Subject: [PATCH] Fix change to import and support sidecar cherry-pick. datadog:patch --- .../kuberuntime/kuberuntime_container.go | 8 +- .../kuberuntime/kuberuntime_manager.go | 308 +++++++++++++++++- test/e2e/node/pods.go | 8 +- 3 files changed, 309 insertions(+), 15 deletions(-) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index e76a93898fb53..d496f1a63ff63 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -775,6 +775,12 @@ func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(ctx context.Con sidecars []*kubecontainer.Container nonSidecars []*kubecontainer.Container ) + + if gracePeriodOverride == nil { + minGracePeriod := int64(minimumGracePeriodInSeconds) + gracePeriodOverride = &minGracePeriod + } + for _, container := range runningPod.Containers { if isSidecar(pod, container.Name) { sidecars = append(sidecars, container) @@ -785,7 +791,7 @@ func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(ctx context.Con containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers)) // non-sidecars first start := time.Now() - klog.Infof("Pod: %s, killContainersWithSyncResult: killing %d non-sidecars, %s termination period", runningPod.Name, len(nonSidecars), gracePeriodOverride) + klog.Infof("Pod: %s, killContainersWithSyncResult: killing %d non-sidecars, %d termination period", runningPod.Name, len(nonSidecars), *gracePeriodOverride) nonSidecarsWg := sync.WaitGroup{} nonSidecarsWg.Add(len(nonSidecars)) for _, container := range nonSidecars { diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 3e320a055ae1f..663949c2cf014 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -28,7 +28,7 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" "go.opentelemetry.io/otel/trace" crierror "k8s.io/cri-api/pkg/errors" - "k8s.io/klog/v2" + klog "k8s.io/klog/v2" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -538,6 +538,285 @@ func isSidecar(pod *v1.Pod, containerName string) bool { return pod.Annotations[fmt.Sprintf("sidecars.lyft.net/container-lifecycle-%s", containerName)] == "Sidecar" } +func isInPlacePodVerticalScalingAllowed(pod *v1.Pod) bool { + if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { + return false + } + if types.IsStaticPod(pod) { + return false + } + return true +} + +func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containerIdx int, kubeContainerStatus *kubecontainer.Status, changes *podActions) bool { + container := pod.Spec.Containers[containerIdx] + if container.Resources.Limits == nil || len(pod.Status.ContainerStatuses) == 0 { + return true + } + + // Determine if the *running* container needs resource update by comparing v1.Spec.Resources (desired) + // with v1.Status.Resources / runtime.Status.Resources (last known actual). + // Proceed only when kubelet has accepted the resize a.k.a v1.Spec.Resources.Requests == v1.Status.AllocatedResources. + // Skip if runtime containerID doesn't match pod.Status containerID (container is restarting) + apiContainerStatus, exists := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name) + if !exists || apiContainerStatus.State.Running == nil || apiContainerStatus.Resources == nil || + kubeContainerStatus.State != kubecontainer.ContainerStateRunning || + kubeContainerStatus.ID.String() != apiContainerStatus.ContainerID || + len(diff.ObjectDiff(container.Resources.Requests, apiContainerStatus.AllocatedResources)) != 0 { + return true + } + + desiredMemoryLimit := container.Resources.Limits.Memory().Value() + desiredCPULimit := container.Resources.Limits.Cpu().MilliValue() + desiredCPURequest := container.Resources.Requests.Cpu().MilliValue() + currentMemoryLimit := apiContainerStatus.Resources.Limits.Memory().Value() + currentCPULimit := apiContainerStatus.Resources.Limits.Cpu().MilliValue() + currentCPURequest := apiContainerStatus.Resources.Requests.Cpu().MilliValue() + // Runtime container status resources (from CRI), if set, supercedes v1(api) container status resrouces. + if kubeContainerStatus.Resources != nil { + if kubeContainerStatus.Resources.MemoryLimit != nil { + currentMemoryLimit = kubeContainerStatus.Resources.MemoryLimit.Value() + } + if kubeContainerStatus.Resources.CPULimit != nil { + currentCPULimit = kubeContainerStatus.Resources.CPULimit.MilliValue() + } + if kubeContainerStatus.Resources.CPURequest != nil { + currentCPURequest = kubeContainerStatus.Resources.CPURequest.MilliValue() + } + } + + // Note: cgroup doesn't support memory request today, so we don't compare that. If canAdmitPod called during + // handlePodResourcesResize finds 'fit', then desiredMemoryRequest == currentMemoryRequest. + if desiredMemoryLimit == currentMemoryLimit && desiredCPULimit == currentCPULimit && desiredCPURequest == currentCPURequest { + return true + } + + desiredResources := containerResources{ + memoryLimit: desiredMemoryLimit, + memoryRequest: apiContainerStatus.AllocatedResources.Memory().Value(), + cpuLimit: desiredCPULimit, + cpuRequest: desiredCPURequest, + } + currentResources := containerResources{ + memoryLimit: currentMemoryLimit, + memoryRequest: apiContainerStatus.Resources.Requests.Memory().Value(), + cpuLimit: currentCPULimit, + cpuRequest: currentCPURequest, + } + + resizePolicy := make(map[v1.ResourceName]v1.ResourceResizeRestartPolicy) + for _, pol := range container.ResizePolicy { + resizePolicy[pol.ResourceName] = pol.RestartPolicy + } + determineContainerResize := func(rName v1.ResourceName, specValue, statusValue int64) (resize, restart bool) { + if specValue == statusValue { + return false, false + } + if resizePolicy[rName] == v1.RestartContainer { + return true, true + } + return true, false + } + markContainerForUpdate := func(rName v1.ResourceName, specValue, statusValue int64) { + cUpdateInfo := containerToUpdateInfo{ + apiContainerIdx: containerIdx, + kubeContainerID: kubeContainerStatus.ID, + desiredContainerResources: desiredResources, + currentContainerResources: ¤tResources, + } + // Order the container updates such that resource decreases are applied before increases + switch { + case specValue > statusValue: // append + changes.ContainersToUpdate[rName] = append(changes.ContainersToUpdate[rName], cUpdateInfo) + case specValue < statusValue: // prepend + changes.ContainersToUpdate[rName] = append(changes.ContainersToUpdate[rName], containerToUpdateInfo{}) + copy(changes.ContainersToUpdate[rName][1:], changes.ContainersToUpdate[rName]) + changes.ContainersToUpdate[rName][0] = cUpdateInfo + } + } + resizeMemLim, restartMemLim := determineContainerResize(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit) + resizeCPULim, restartCPULim := determineContainerResize(v1.ResourceCPU, desiredCPULimit, currentCPULimit) + resizeCPUReq, restartCPUReq := determineContainerResize(v1.ResourceCPU, desiredCPURequest, currentCPURequest) + if restartCPULim || restartCPUReq || restartMemLim { + // resize policy requires this container to restart + changes.ContainersToKill[kubeContainerStatus.ID] = containerToKillInfo{ + name: kubeContainerStatus.Name, + container: &pod.Spec.Containers[containerIdx], + message: fmt.Sprintf("Container %s resize requires restart", container.Name), + } + changes.ContainersToStart = append(changes.ContainersToStart, containerIdx) + changes.UpdatePodResources = true + return false + } else { + if resizeMemLim { + markContainerForUpdate(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit) + } + if resizeCPULim { + markContainerForUpdate(v1.ResourceCPU, desiredCPULimit, currentCPULimit) + } else if resizeCPUReq { + markContainerForUpdate(v1.ResourceCPU, desiredCPURequest, currentCPURequest) + } + } + return true +} + +func (m *kubeGenericRuntimeManager) doPodResizeAction(pod *v1.Pod, podStatus *kubecontainer.PodStatus, podContainerChanges podActions, result kubecontainer.PodSyncResult) { + pcm := m.containerManager.NewPodContainerManager() + //TODO(vinaykul,InPlacePodVerticalScaling): Figure out best way to get enforceMemoryQoS value (parameter #4 below) in platform-agnostic way + podResources := cm.ResourceConfigForPod(pod, m.cpuCFSQuota, uint64((m.cpuCFSQuotaPeriod.Duration)/time.Microsecond), false) + if podResources == nil { + klog.ErrorS(nil, "Unable to get resource configuration", "pod", pod.Name) + result.Fail(fmt.Errorf("Unable to get resource configuration processing resize for pod %s", pod.Name)) + return + } + setPodCgroupConfig := func(rName v1.ResourceName, setLimitValue bool) error { + var err error + switch rName { + case v1.ResourceCPU: + podCpuResources := &cm.ResourceConfig{CPUPeriod: podResources.CPUPeriod} + if setLimitValue == true { + podCpuResources.CPUQuota = podResources.CPUQuota + } else { + podCpuResources.CPUShares = podResources.CPUShares + } + err = pcm.SetPodCgroupConfig(pod, rName, podCpuResources) + case v1.ResourceMemory: + err = pcm.SetPodCgroupConfig(pod, rName, podResources) + } + if err != nil { + klog.ErrorS(err, "Failed to set cgroup config", "resource", rName, "pod", pod.Name) + } + return err + } + // Memory and CPU are updated separately because memory resizes may be ordered differently than CPU resizes. + // If resize results in net pod resource increase, set pod cgroup config before resizing containers. + // If resize results in net pod resource decrease, set pod cgroup config after resizing containers. + // If an error occurs at any point, abort. Let future syncpod iterations retry the unfinished stuff. + resizeContainers := func(rName v1.ResourceName, currPodCgLimValue, newPodCgLimValue, currPodCgReqValue, newPodCgReqValue int64) error { + var err error + if newPodCgLimValue > currPodCgLimValue { + if err = setPodCgroupConfig(rName, true); err != nil { + return err + } + } + if newPodCgReqValue > currPodCgReqValue { + if err = setPodCgroupConfig(rName, false); err != nil { + return err + } + } + if len(podContainerChanges.ContainersToUpdate[rName]) > 0 { + if err = m.updatePodContainerResources(pod, rName, podContainerChanges.ContainersToUpdate[rName]); err != nil { + klog.ErrorS(err, "updatePodContainerResources failed", "pod", format.Pod(pod), "resource", rName) + return err + } + } + if newPodCgLimValue < currPodCgLimValue { + err = setPodCgroupConfig(rName, true) + } + if newPodCgReqValue < currPodCgReqValue { + if err = setPodCgroupConfig(rName, false); err != nil { + return err + } + } + return err + } + if len(podContainerChanges.ContainersToUpdate[v1.ResourceMemory]) > 0 || podContainerChanges.UpdatePodResources { + if podResources.Memory == nil { + klog.ErrorS(nil, "podResources.Memory is nil", "pod", pod.Name) + result.Fail(fmt.Errorf("podResources.Memory is nil for pod %s", pod.Name)) + return + } + currentPodMemoryConfig, err := pcm.GetPodCgroupConfig(pod, v1.ResourceMemory) + if err != nil { + klog.ErrorS(err, "GetPodCgroupConfig for memory failed", "pod", pod.Name) + result.Fail(err) + return + } + currentPodMemoryUsage, err := pcm.GetPodCgroupMemoryUsage(pod) + if err != nil { + klog.ErrorS(err, "GetPodCgroupMemoryUsage failed", "pod", pod.Name) + result.Fail(err) + return + } + if currentPodMemoryUsage >= uint64(*podResources.Memory) { + klog.ErrorS(nil, "Aborting attempt to set pod memory limit less than current memory usage", "pod", pod.Name) + result.Fail(fmt.Errorf("Aborting attempt to set pod memory limit less than current memory usage for pod %s", pod.Name)) + return + } + if errResize := resizeContainers(v1.ResourceMemory, int64(*currentPodMemoryConfig.Memory), *podResources.Memory, 0, 0); errResize != nil { + result.Fail(errResize) + return + } + } + if len(podContainerChanges.ContainersToUpdate[v1.ResourceCPU]) > 0 || podContainerChanges.UpdatePodResources { + if podResources.CPUQuota == nil || podResources.CPUShares == nil { + klog.ErrorS(nil, "podResources.CPUQuota or podResources.CPUShares is nil", "pod", pod.Name) + result.Fail(fmt.Errorf("podResources.CPUQuota or podResources.CPUShares is nil for pod %s", pod.Name)) + return + } + currentPodCpuConfig, err := pcm.GetPodCgroupConfig(pod, v1.ResourceCPU) + if err != nil { + klog.ErrorS(err, "GetPodCgroupConfig for CPU failed", "pod", pod.Name) + result.Fail(err) + return + } + if errResize := resizeContainers(v1.ResourceCPU, *currentPodCpuConfig.CPUQuota, *podResources.CPUQuota, + int64(*currentPodCpuConfig.CPUShares), int64(*podResources.CPUShares)); errResize != nil { + result.Fail(errResize) + return + } + } +} + +func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, resourceName v1.ResourceName, containersToUpdate []containerToUpdateInfo) error { + klog.V(5).InfoS("Updating container resources", "pod", klog.KObj(pod)) + + for _, cInfo := range containersToUpdate { + container := pod.Spec.Containers[cInfo.apiContainerIdx].DeepCopy() + // If updating memory limit, use most recently configured CPU request and limit values. + // If updating CPU request and limit, use most recently configured memory request and limit values. + switch resourceName { + case v1.ResourceMemory: + container.Resources.Limits = v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(cInfo.currentContainerResources.cpuLimit, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(cInfo.desiredContainerResources.memoryLimit, resource.BinarySI), + } + container.Resources.Requests = v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(cInfo.currentContainerResources.cpuRequest, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(cInfo.desiredContainerResources.memoryRequest, resource.BinarySI), + } + case v1.ResourceCPU: + container.Resources.Limits = v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(cInfo.desiredContainerResources.cpuLimit, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(cInfo.currentContainerResources.memoryLimit, resource.BinarySI), + } + container.Resources.Requests = v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(cInfo.desiredContainerResources.cpuRequest, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(cInfo.currentContainerResources.memoryRequest, resource.BinarySI), + } + } + if err := m.updateContainerResources(pod, container, cInfo.kubeContainerID); err != nil { + // Log error and abort as container updates need to succeed in the order determined by computePodResizeAction. + // The recovery path is for SyncPod to keep retrying at later times until it succeeds. + klog.ErrorS(err, "updateContainerResources failed", "container", container.Name, "cID", cInfo.kubeContainerID, + "pod", format.Pod(pod), "resourceName", resourceName) + return err + } + // If UpdateContainerResources is error-free, it means desired values for 'resourceName' was accepted by runtime. + // So we update currentContainerResources for 'resourceName', which is our view of most recently configured resources. + // Note: We can't rely on GetPodStatus as runtime may lag in actuating the resource values it just accepted. + switch resourceName { + case v1.ResourceMemory: + cInfo.currentContainerResources.memoryLimit = cInfo.desiredContainerResources.memoryLimit + cInfo.currentContainerResources.memoryRequest = cInfo.desiredContainerResources.memoryRequest + case v1.ResourceCPU: + cInfo.currentContainerResources.cpuLimit = cInfo.desiredContainerResources.cpuLimit + cInfo.currentContainerResources.cpuRequest = cInfo.desiredContainerResources.cpuRequest + } + } + return nil +} + // computePodActions checks whether the pod spec has changed and returns the changes if true. func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions { klog.V(5).InfoS("Syncing Pod", "pod", klog.KObj(pod)) @@ -578,6 +857,24 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod * changes.CreateSandbox = false return changes } + + // Get the containers to start, excluding the ones that succeeded if RestartPolicy is OnFailure. + var containersToStart []int + for idx, c := range pod.Spec.Containers { + if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure && containerSucceeded(&c, podStatus) { + continue + } + containersToStart = append(containersToStart, idx) + } + // We should not create a sandbox for a Pod if initialization is done and there is no container to start. + if len(containersToStart) == 0 { + _, _, done := findNextInitContainerToRun(pod, podStatus) + if done { + changes.CreateSandbox = false + return changes + } + } + if len(pod.Spec.InitContainers) != 0 { // Pod has init containers, return the first one. changes.NextInitContainerToStart = &pod.Spec.InitContainers[0] @@ -591,14 +888,7 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod * } return changes } - // Start all containers by default but exclude the ones that - // succeeded if RestartPolicy is OnFailure - for idx, c := range pod.Spec.Containers { - if containerSucceeded(&c, podStatus) && pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure { - continue - } - changes.ContainersToStart = append(changes.ContainersToStart, idx) - } + changes.ContainersToStart = containersToStart return changes } diff --git a/test/e2e/node/pods.go b/test/e2e/node/pods.go index 7d013e3226c18..1641f4dd6ee5d 100644 --- a/test/e2e/node/pods.go +++ b/test/e2e/node/pods.go @@ -29,6 +29,9 @@ import ( "sync" "time" + ginkgo "github.com/onsi/ginkgo/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/expfmt" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -42,11 +45,6 @@ import ( e2epod "k8s.io/kubernetes/test/e2e/framework/pod" imageutils "k8s.io/kubernetes/test/utils/image" admissionapi "k8s.io/pod-security-admission/api" - utilpointer "k8s.io/utils/pointer" - - "github.com/onsi/ginkgo/v2" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/expfmt" ) var _ = SIGDescribe("Pods Extended", func() {