From ef2afaa258e02cb41d904fa7b74e92de149de2eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 13 Mar 2024 13:57:54 +0100 Subject: [PATCH] Get node from local cache instead of kube-apiserver cache datadog:patch --- pkg/kubelet/kubelet_node_status.go | 15 +++++---- pkg/kubelet/kubelet_node_status_test.go | 45 +++++++++++++++++++++---- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index d598c55b79a93..dd95173709ada 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -40,7 +40,6 @@ import ( v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/nodestatus" - "k8s.io/kubernetes/pkg/kubelet/util" taintutil "k8s.io/kubernetes/pkg/util/taints" volutil "k8s.io/kubernetes/pkg/volume/util" ) @@ -550,15 +549,19 @@ func (kl *Kubelet) updateNodeStatus(ctx context.Context) error { func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error { // In large clusters, GET and PUT operations on Node objects coming // from here are the majority of load on apiserver and etcd. - // To reduce the load on etcd, we are serving GET operations from - // apiserver cache (the data might be slightly delayed but it doesn't + // To reduce the load on control-plane, we are serving GET operations from + // local lister (the data might be slightly delayed but it doesn't // seem to cause more conflict - the delays are pretty small). // If it result in a conflict, all retries are served directly from etcd. - opts := metav1.GetOptions{} + var originalNode *v1.Node + var err error + if tryNumber == 0 { - util.FromApiserverCache(&opts) + originalNode, err = kl.nodeLister.Get(string(kl.nodeName)) + } else { + opts := metav1.GetOptions{} + originalNode, err = kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts) } - originalNode, err := kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts) if err != nil { return fmt.Errorf("error getting node %q: %v", kl.nodeName, err) } diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 61cb729e68385..938d432902974 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -38,6 +38,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/rand" @@ -160,6 +161,27 @@ func (lcm *localCM) GetCapacity(localStorageCapacityIsolation bool) v1.ResourceL return lcm.capacity } +type delegatingNodeLister struct { + client clientset.Interface +} + +func (l delegatingNodeLister) Get(name string) (*v1.Node, error) { + return l.client.CoreV1().Nodes().Get(context.Background(), name, metav1.GetOptions{}) +} + +func (l delegatingNodeLister) List(selector labels.Selector) (ret []*v1.Node, err error) { + opts := metav1.ListOptions{} + if selector != nil { + opts.LabelSelector = selector.String() + } + nodeList, err := l.client.CoreV1().Nodes().List(context.Background(), opts) + if err != nil { + return nil, err + } + nodes := make([]*v1.Node, len(nodeList.Items)) + return nodes, nil +} + func TestUpdateNewNodeStatus(t *testing.T) { cases := []struct { desc string @@ -211,6 +233,7 @@ func TestUpdateNewNodeStatus(t *testing.T) { kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain + kubelet.nodeLister = delegatingNodeLister{client: kubeClient} machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", @@ -390,6 +413,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) { }, } kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain + kubelet.nodeLister = delegatingNodeLister{client: kubeClient} machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", @@ -602,6 +626,7 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { kubeClient := testKubelet.fakeKubeClient existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain + kubelet.nodeLister = delegatingNodeLister{client: kubeClient} machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", @@ -824,6 +849,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) { kubeClient := testKubelet.fakeKubeClient existingNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}} kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*existingNode}}).ReactionChain + kubelet.nodeLister = delegatingNodeLister{client: kubeClient} machineInfo := &cadvisorapi.MachineInfo{ MachineID: "123", SystemUUID: "abc", @@ -1108,6 +1134,7 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) { kubeClient := testKubelet.fakeKubeClient kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*tc.existingNode}}).ReactionChain + kubelet.nodeLister = delegatingNodeLister{client: kubeClient} // Execute assert.NoError(t, kubelet.updateNodeStatus(ctx)) @@ -1260,11 +1287,15 @@ func TestFastStatusUpdateOnce(t *testing.T) { return } - // patch, get, patch, get, patch, ... up to initial patch + nodeStatusUpdateRetry patches - require.Len(t, actions, 2*tc.wantPatches-1) + // patch, then patch, get, patch, get, patch, ... up to initial patch + nodeStatusUpdateRetry patches + expectedActions := 2*tc.wantPatches - 2 + if tc.wantPatches == 1 { + expectedActions = 1 + } + require.Len(t, actions, expectedActions) for i, action := range actions { - if i%2 == 1 { + if i%2 == 0 && i > 0 { require.IsType(t, core.GetActionImpl{}, action) continue } @@ -1566,11 +1597,11 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) { kubelet.updateRuntimeUp() assert.NoError(t, kubelet.updateNodeStatus(ctx)) actions := kubeClient.Actions() - require.Len(t, actions, 2) - require.True(t, actions[1].Matches("patch", "nodes")) - require.Equal(t, actions[1].GetSubresource(), "status") + require.Len(t, actions, 1) + require.True(t, actions[0].Matches("patch", "nodes")) + require.Equal(t, actions[0].GetSubresource(), "status") - updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch()) + updatedNode, err := applyNodeStatusPatch(&existingNode, actions[0].(core.PatchActionImpl).GetPatch()) assert.NoError(t, err) assert.True(t, apiequality.Semantic.DeepEqual(expectedNode.Status.Allocatable, updatedNode.Status.Allocatable), "%s", diff.ObjectDiff(expectedNode.Status.Allocatable, updatedNode.Status.Allocatable)) }