Skip to content

Commit

Permalink
Fix change to import and support sidecar cherry-pick.
Browse files Browse the repository at this point in the history
datadog:patch
  • Loading branch information
nyodas committed Apr 11, 2024
1 parent ab3844e commit 5c0bc07
Show file tree
Hide file tree
Showing 3 changed files with 309 additions and 15 deletions.
8 changes: 7 additions & 1 deletion pkg/kubelet/kuberuntime/kuberuntime_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,12 @@ func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(ctx context.Con
sidecars []*kubecontainer.Container
nonSidecars []*kubecontainer.Container
)

if gracePeriodOverride == nil {
minGracePeriod := int64(minimumGracePeriodInSeconds)
gracePeriodOverride = &minGracePeriod
}

for _, container := range runningPod.Containers {
if isSidecar(pod, container.Name) {
sidecars = append(sidecars, container)
Expand All @@ -785,7 +791,7 @@ func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(ctx context.Con
containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers))
// non-sidecars first
start := time.Now()
klog.Infof("Pod: %s, killContainersWithSyncResult: killing %d non-sidecars, %s termination period", runningPod.Name, len(nonSidecars), gracePeriodOverride)
klog.Infof("Pod: %s, killContainersWithSyncResult: killing %d non-sidecars, %d termination period", runningPod.Name, len(nonSidecars), *gracePeriodOverride)
nonSidecarsWg := sync.WaitGroup{}
nonSidecarsWg.Add(len(nonSidecars))
for _, container := range nonSidecars {
Expand Down
308 changes: 299 additions & 9 deletions pkg/kubelet/kuberuntime/kuberuntime_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1"
"go.opentelemetry.io/otel/trace"
crierror "k8s.io/cri-api/pkg/errors"
"k8s.io/klog/v2"
klog "k8s.io/klog/v2"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -538,6 +538,285 @@ func isSidecar(pod *v1.Pod, containerName string) bool {
return pod.Annotations[fmt.Sprintf("sidecars.lyft.net/container-lifecycle-%s", containerName)] == "Sidecar"
}

func isInPlacePodVerticalScalingAllowed(pod *v1.Pod) bool {
if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
return false
}
if types.IsStaticPod(pod) {
return false
}
return true
}

func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containerIdx int, kubeContainerStatus *kubecontainer.Status, changes *podActions) bool {
container := pod.Spec.Containers[containerIdx]
if container.Resources.Limits == nil || len(pod.Status.ContainerStatuses) == 0 {
return true
}

// Determine if the *running* container needs resource update by comparing v1.Spec.Resources (desired)
// with v1.Status.Resources / runtime.Status.Resources (last known actual).
// Proceed only when kubelet has accepted the resize a.k.a v1.Spec.Resources.Requests == v1.Status.AllocatedResources.
// Skip if runtime containerID doesn't match pod.Status containerID (container is restarting)
apiContainerStatus, exists := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name)
if !exists || apiContainerStatus.State.Running == nil || apiContainerStatus.Resources == nil ||
kubeContainerStatus.State != kubecontainer.ContainerStateRunning ||
kubeContainerStatus.ID.String() != apiContainerStatus.ContainerID ||
len(diff.ObjectDiff(container.Resources.Requests, apiContainerStatus.AllocatedResources)) != 0 {
return true
}

desiredMemoryLimit := container.Resources.Limits.Memory().Value()
desiredCPULimit := container.Resources.Limits.Cpu().MilliValue()
desiredCPURequest := container.Resources.Requests.Cpu().MilliValue()
currentMemoryLimit := apiContainerStatus.Resources.Limits.Memory().Value()
currentCPULimit := apiContainerStatus.Resources.Limits.Cpu().MilliValue()
currentCPURequest := apiContainerStatus.Resources.Requests.Cpu().MilliValue()
// Runtime container status resources (from CRI), if set, supercedes v1(api) container status resrouces.
if kubeContainerStatus.Resources != nil {
if kubeContainerStatus.Resources.MemoryLimit != nil {
currentMemoryLimit = kubeContainerStatus.Resources.MemoryLimit.Value()
}
if kubeContainerStatus.Resources.CPULimit != nil {
currentCPULimit = kubeContainerStatus.Resources.CPULimit.MilliValue()
}
if kubeContainerStatus.Resources.CPURequest != nil {
currentCPURequest = kubeContainerStatus.Resources.CPURequest.MilliValue()
}
}

// Note: cgroup doesn't support memory request today, so we don't compare that. If canAdmitPod called during
// handlePodResourcesResize finds 'fit', then desiredMemoryRequest == currentMemoryRequest.
if desiredMemoryLimit == currentMemoryLimit && desiredCPULimit == currentCPULimit && desiredCPURequest == currentCPURequest {
return true
}

desiredResources := containerResources{
memoryLimit: desiredMemoryLimit,
memoryRequest: apiContainerStatus.AllocatedResources.Memory().Value(),
cpuLimit: desiredCPULimit,
cpuRequest: desiredCPURequest,
}
currentResources := containerResources{
memoryLimit: currentMemoryLimit,
memoryRequest: apiContainerStatus.Resources.Requests.Memory().Value(),
cpuLimit: currentCPULimit,
cpuRequest: currentCPURequest,
}

resizePolicy := make(map[v1.ResourceName]v1.ResourceResizeRestartPolicy)
for _, pol := range container.ResizePolicy {
resizePolicy[pol.ResourceName] = pol.RestartPolicy
}
determineContainerResize := func(rName v1.ResourceName, specValue, statusValue int64) (resize, restart bool) {
if specValue == statusValue {
return false, false
}
if resizePolicy[rName] == v1.RestartContainer {
return true, true
}
return true, false
}
markContainerForUpdate := func(rName v1.ResourceName, specValue, statusValue int64) {
cUpdateInfo := containerToUpdateInfo{
apiContainerIdx: containerIdx,
kubeContainerID: kubeContainerStatus.ID,
desiredContainerResources: desiredResources,
currentContainerResources: &currentResources,
}
// Order the container updates such that resource decreases are applied before increases
switch {
case specValue > statusValue: // append
changes.ContainersToUpdate[rName] = append(changes.ContainersToUpdate[rName], cUpdateInfo)
case specValue < statusValue: // prepend
changes.ContainersToUpdate[rName] = append(changes.ContainersToUpdate[rName], containerToUpdateInfo{})
copy(changes.ContainersToUpdate[rName][1:], changes.ContainersToUpdate[rName])
changes.ContainersToUpdate[rName][0] = cUpdateInfo
}
}
resizeMemLim, restartMemLim := determineContainerResize(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit)
resizeCPULim, restartCPULim := determineContainerResize(v1.ResourceCPU, desiredCPULimit, currentCPULimit)
resizeCPUReq, restartCPUReq := determineContainerResize(v1.ResourceCPU, desiredCPURequest, currentCPURequest)
if restartCPULim || restartCPUReq || restartMemLim {
// resize policy requires this container to restart
changes.ContainersToKill[kubeContainerStatus.ID] = containerToKillInfo{
name: kubeContainerStatus.Name,
container: &pod.Spec.Containers[containerIdx],
message: fmt.Sprintf("Container %s resize requires restart", container.Name),
}
changes.ContainersToStart = append(changes.ContainersToStart, containerIdx)
changes.UpdatePodResources = true
return false
} else {
if resizeMemLim {
markContainerForUpdate(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit)
}
if resizeCPULim {
markContainerForUpdate(v1.ResourceCPU, desiredCPULimit, currentCPULimit)
} else if resizeCPUReq {
markContainerForUpdate(v1.ResourceCPU, desiredCPURequest, currentCPURequest)
}
}
return true
}

func (m *kubeGenericRuntimeManager) doPodResizeAction(pod *v1.Pod, podStatus *kubecontainer.PodStatus, podContainerChanges podActions, result kubecontainer.PodSyncResult) {
pcm := m.containerManager.NewPodContainerManager()
//TODO(vinaykul,InPlacePodVerticalScaling): Figure out best way to get enforceMemoryQoS value (parameter #4 below) in platform-agnostic way
podResources := cm.ResourceConfigForPod(pod, m.cpuCFSQuota, uint64((m.cpuCFSQuotaPeriod.Duration)/time.Microsecond), false)
if podResources == nil {
klog.ErrorS(nil, "Unable to get resource configuration", "pod", pod.Name)
result.Fail(fmt.Errorf("Unable to get resource configuration processing resize for pod %s", pod.Name))
return
}
setPodCgroupConfig := func(rName v1.ResourceName, setLimitValue bool) error {
var err error
switch rName {
case v1.ResourceCPU:
podCpuResources := &cm.ResourceConfig{CPUPeriod: podResources.CPUPeriod}
if setLimitValue == true {
podCpuResources.CPUQuota = podResources.CPUQuota
} else {
podCpuResources.CPUShares = podResources.CPUShares
}
err = pcm.SetPodCgroupConfig(pod, rName, podCpuResources)
case v1.ResourceMemory:
err = pcm.SetPodCgroupConfig(pod, rName, podResources)
}
if err != nil {
klog.ErrorS(err, "Failed to set cgroup config", "resource", rName, "pod", pod.Name)
}
return err
}
// Memory and CPU are updated separately because memory resizes may be ordered differently than CPU resizes.
// If resize results in net pod resource increase, set pod cgroup config before resizing containers.
// If resize results in net pod resource decrease, set pod cgroup config after resizing containers.
// If an error occurs at any point, abort. Let future syncpod iterations retry the unfinished stuff.
resizeContainers := func(rName v1.ResourceName, currPodCgLimValue, newPodCgLimValue, currPodCgReqValue, newPodCgReqValue int64) error {
var err error
if newPodCgLimValue > currPodCgLimValue {
if err = setPodCgroupConfig(rName, true); err != nil {
return err
}
}
if newPodCgReqValue > currPodCgReqValue {
if err = setPodCgroupConfig(rName, false); err != nil {
return err
}
}
if len(podContainerChanges.ContainersToUpdate[rName]) > 0 {
if err = m.updatePodContainerResources(pod, rName, podContainerChanges.ContainersToUpdate[rName]); err != nil {
klog.ErrorS(err, "updatePodContainerResources failed", "pod", format.Pod(pod), "resource", rName)
return err
}
}
if newPodCgLimValue < currPodCgLimValue {
err = setPodCgroupConfig(rName, true)
}
if newPodCgReqValue < currPodCgReqValue {
if err = setPodCgroupConfig(rName, false); err != nil {
return err
}
}
return err
}
if len(podContainerChanges.ContainersToUpdate[v1.ResourceMemory]) > 0 || podContainerChanges.UpdatePodResources {
if podResources.Memory == nil {
klog.ErrorS(nil, "podResources.Memory is nil", "pod", pod.Name)
result.Fail(fmt.Errorf("podResources.Memory is nil for pod %s", pod.Name))
return
}
currentPodMemoryConfig, err := pcm.GetPodCgroupConfig(pod, v1.ResourceMemory)
if err != nil {
klog.ErrorS(err, "GetPodCgroupConfig for memory failed", "pod", pod.Name)
result.Fail(err)
return
}
currentPodMemoryUsage, err := pcm.GetPodCgroupMemoryUsage(pod)
if err != nil {
klog.ErrorS(err, "GetPodCgroupMemoryUsage failed", "pod", pod.Name)
result.Fail(err)
return
}
if currentPodMemoryUsage >= uint64(*podResources.Memory) {
klog.ErrorS(nil, "Aborting attempt to set pod memory limit less than current memory usage", "pod", pod.Name)
result.Fail(fmt.Errorf("Aborting attempt to set pod memory limit less than current memory usage for pod %s", pod.Name))
return
}
if errResize := resizeContainers(v1.ResourceMemory, int64(*currentPodMemoryConfig.Memory), *podResources.Memory, 0, 0); errResize != nil {
result.Fail(errResize)
return
}
}
if len(podContainerChanges.ContainersToUpdate[v1.ResourceCPU]) > 0 || podContainerChanges.UpdatePodResources {
if podResources.CPUQuota == nil || podResources.CPUShares == nil {
klog.ErrorS(nil, "podResources.CPUQuota or podResources.CPUShares is nil", "pod", pod.Name)
result.Fail(fmt.Errorf("podResources.CPUQuota or podResources.CPUShares is nil for pod %s", pod.Name))
return
}
currentPodCpuConfig, err := pcm.GetPodCgroupConfig(pod, v1.ResourceCPU)
if err != nil {
klog.ErrorS(err, "GetPodCgroupConfig for CPU failed", "pod", pod.Name)
result.Fail(err)
return
}
if errResize := resizeContainers(v1.ResourceCPU, *currentPodCpuConfig.CPUQuota, *podResources.CPUQuota,
int64(*currentPodCpuConfig.CPUShares), int64(*podResources.CPUShares)); errResize != nil {
result.Fail(errResize)
return
}
}
}

func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, resourceName v1.ResourceName, containersToUpdate []containerToUpdateInfo) error {
klog.V(5).InfoS("Updating container resources", "pod", klog.KObj(pod))

for _, cInfo := range containersToUpdate {
container := pod.Spec.Containers[cInfo.apiContainerIdx].DeepCopy()
// If updating memory limit, use most recently configured CPU request and limit values.
// If updating CPU request and limit, use most recently configured memory request and limit values.
switch resourceName {
case v1.ResourceMemory:
container.Resources.Limits = v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(cInfo.currentContainerResources.cpuLimit, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(cInfo.desiredContainerResources.memoryLimit, resource.BinarySI),
}
container.Resources.Requests = v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(cInfo.currentContainerResources.cpuRequest, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(cInfo.desiredContainerResources.memoryRequest, resource.BinarySI),
}
case v1.ResourceCPU:
container.Resources.Limits = v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(cInfo.desiredContainerResources.cpuLimit, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(cInfo.currentContainerResources.memoryLimit, resource.BinarySI),
}
container.Resources.Requests = v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(cInfo.desiredContainerResources.cpuRequest, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(cInfo.currentContainerResources.memoryRequest, resource.BinarySI),
}
}
if err := m.updateContainerResources(pod, container, cInfo.kubeContainerID); err != nil {
// Log error and abort as container updates need to succeed in the order determined by computePodResizeAction.
// The recovery path is for SyncPod to keep retrying at later times until it succeeds.
klog.ErrorS(err, "updateContainerResources failed", "container", container.Name, "cID", cInfo.kubeContainerID,
"pod", format.Pod(pod), "resourceName", resourceName)
return err
}
// If UpdateContainerResources is error-free, it means desired values for 'resourceName' was accepted by runtime.
// So we update currentContainerResources for 'resourceName', which is our view of most recently configured resources.
// Note: We can't rely on GetPodStatus as runtime may lag in actuating the resource values it just accepted.
switch resourceName {
case v1.ResourceMemory:
cInfo.currentContainerResources.memoryLimit = cInfo.desiredContainerResources.memoryLimit
cInfo.currentContainerResources.memoryRequest = cInfo.desiredContainerResources.memoryRequest
case v1.ResourceCPU:
cInfo.currentContainerResources.cpuLimit = cInfo.desiredContainerResources.cpuLimit
cInfo.currentContainerResources.cpuRequest = cInfo.desiredContainerResources.cpuRequest
}
}
return nil
}

// computePodActions checks whether the pod spec has changed and returns the changes if true.
func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions {
klog.V(5).InfoS("Syncing Pod", "pod", klog.KObj(pod))
Expand Down Expand Up @@ -578,6 +857,24 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *
changes.CreateSandbox = false
return changes
}

// Get the containers to start, excluding the ones that succeeded if RestartPolicy is OnFailure.
var containersToStart []int
for idx, c := range pod.Spec.Containers {
if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure && containerSucceeded(&c, podStatus) {
continue
}
containersToStart = append(containersToStart, idx)
}
// We should not create a sandbox for a Pod if initialization is done and there is no container to start.
if len(containersToStart) == 0 {
_, _, done := findNextInitContainerToRun(pod, podStatus)
if done {
changes.CreateSandbox = false
return changes
}
}

if len(pod.Spec.InitContainers) != 0 {
// Pod has init containers, return the first one.
changes.NextInitContainerToStart = &pod.Spec.InitContainers[0]
Expand All @@ -591,14 +888,7 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *
}
return changes
}
// Start all containers by default but exclude the ones that
// succeeded if RestartPolicy is OnFailure
for idx, c := range pod.Spec.Containers {
if containerSucceeded(&c, podStatus) && pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure {
continue
}
changes.ContainersToStart = append(changes.ContainersToStart, idx)
}
changes.ContainersToStart = containersToStart
return changes
}

Expand Down
8 changes: 3 additions & 5 deletions test/e2e/node/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"sync"
"time"

ginkgo "github.com/onsi/ginkgo/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -42,11 +45,6 @@ import (
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
imageutils "k8s.io/kubernetes/test/utils/image"
admissionapi "k8s.io/pod-security-admission/api"
utilpointer "k8s.io/utils/pointer"

"github.com/onsi/ginkgo/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
)

var _ = SIGDescribe("Pods Extended", func() {
Expand Down

0 comments on commit 5c0bc07

Please sign in to comment.