From 21daf95f3661c67d560c7382264c484ee7a5178f Mon Sep 17 00:00:00 2001 From: Amanda Vialva Date: Tue, 5 Nov 2024 11:59:08 -0500 Subject: [PATCH 1/2] fix: perf issue with too many API reqs when listing pods in all ns --- master/internal/rm/kubernetesrm/jobs.go | 99 ++++++++-- master/internal/rm/kubernetesrm/jobs_test.go | 173 +++++++++++++++++- .../kubernetes_resource_manager_intg_test.go | 5 + 3 files changed, 254 insertions(+), 23 deletions(-) diff --git a/master/internal/rm/kubernetesrm/jobs.go b/master/internal/rm/kubernetesrm/jobs.go index 52148a8cf29..5dbad7cb86e 100644 --- a/master/internal/rm/kubernetesrm/jobs.go +++ b/master/internal/rm/kubernetesrm/jobs.go @@ -17,6 +17,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" batchV1 "k8s.io/api/batch/v1" k8sV1 "k8s.io/api/core/v1" k8error "k8s.io/apimachinery/pkg/api/errors" @@ -71,6 +72,8 @@ const ( ReleaseNamespaceEnvVar = "DET_RELEASE_NAMESPACE" // ResourceTypeNvidia describes the GPU resource type. ResourceTypeNvidia = "nvidia.com/gpu" + // DefaultClientBurst is the default Kubernetes burst limit for the k8s Go client. + DefaultClientBurst = 10 ) var cacheSyncs []cache.InformerSynced @@ -348,6 +351,7 @@ func (j *jobsService) startClientSet(namespaces []string) error { return fmt.Errorf("failed to initialize kubernetes clientSet: %w", err) } + j.podInterfaces[""] = j.clientSet.CoreV1().Pods("") for _, ns := range namespaces { j.podInterfaces[ns] = j.clientSet.CoreV1().Pods(ns) j.configMapInterfaces[ns] = j.clientSet.CoreV1().ConfigMaps(ns) @@ -1082,20 +1086,47 @@ func (j *jobsService) GetSlot(msg *apiv1.GetSlotRequest) *apiv1.GetSlotResponse return j.getSlot(msg.AgentId, msg.SlotId) } +func (j *jobsService) healthStatusFallback(ctx context.Context) model.HealthStatus { + g := errgroup.Group{} + cnt := 0 + for n, podInterface := range j.podInterfaces { + if len(n) == 0 { + continue + } + g.Go(func() error { + time.Sleep(time.Duration(cnt/DefaultClientBurst) * 1050 * time.Millisecond) + _, err := podInterface.List(ctx, metaV1.ListOptions{Limit: 1}) + if err != nil { + return err + } + return nil + }) + cnt++ + } + err := g.Wait() + if err != nil { + return model.Unhealthy + } + return model.Healthy +} + func (j *jobsService) HealthStatus() model.HealthStatus { j.mu.Lock() defer j.mu.Unlock() - for _, podInterface := range j.podInterfaces { - _, err := podInterface.List(context.TODO(), metaV1.ListOptions{Limit: 1}) - if err != nil { - j.syslog.WithError(err).Error("kubernetes resource manager marked as unhealthy") - return model.Unhealthy - } - return model.Healthy + ctx := context.TODO() + if len(j.podInterfaces) == 0 { + logrus.Error("expected podInterface to be non empty") + return model.Unhealthy } - logrus.Error("expected jobInterface to be non empty") - return model.Unhealthy + _, err := j.podInterfaces[""].List(ctx, metaV1.ListOptions{Limit: 1}) + if err != nil { + if k8error.IsForbidden(err) { + return j.healthStatusFallback(ctx) + } + return model.Unhealthy + } + return model.Healthy } func (j *jobsService) startNodeInformer() error { @@ -2114,20 +2145,56 @@ func (j *jobsService) listJobsInAllNamespaces( return res, nil } +func (j *jobsService) listImportantPods(ctx context.Context, opts metaV1.ListOptions) (*k8sV1.PodList, error) { + resLock := sync.Mutex{} + res := &k8sV1.PodList{} + g := errgroup.Group{} + cnt := 0 + for n, podInterface := range j.podInterfaces { + if len(n) == 0 { + continue + } + g.Go(func() error { + time.Sleep(time.Duration(cnt/DefaultClientBurst) * 1050 * time.Millisecond) + pods, err := podInterface.List(ctx, opts) + if err != nil { + return fmt.Errorf("error listing pods for namespace %s: %w", n, err) + } + resLock.Lock() + res.Items = append(res.Items, pods.Items...) + resLock.Unlock() + return nil + }) + cnt++ + } + err := g.Wait() + if err != nil { + return nil, err + } + return res, nil +} + func (j *jobsService) listPodsInAllNamespaces( ctx context.Context, opts metaV1.ListOptions, ) (*k8sV1.PodList, error) { - res := &k8sV1.PodList{} - for n, i := range j.podInterfaces { - pods, err := i.List(ctx, opts) - if err != nil { - return nil, fmt.Errorf("error listing pods for namespace %s: %w", n, err) + allPods, err := j.podInterfaces[""].List(ctx, opts) + if err != nil { + if k8error.IsForbidden(err) { + return j.listImportantPods(ctx, opts) } + return nil, err + } - res.Items = append(res.Items, pods.Items...) + var podsWeWant k8sV1.PodList + namespaces := set.FromKeys(j.podInterfaces) + for _, pod := range allPods.Items { + if namespaces.Contains(pod.Namespace) { + podsWeWant.Items = append(podsWeWant.Items, pod) + } } - return res, nil + allPods.Items, podsWeWant.Items = podsWeWant.Items, allPods.Items + return allPods, nil } func (j *jobsService) listConfigMapsInAllNamespaces( diff --git a/master/internal/rm/kubernetesrm/jobs_test.go b/master/internal/rm/kubernetesrm/jobs_test.go index 23931152ce3..1894ab65a70 100644 --- a/master/internal/rm/kubernetesrm/jobs_test.go +++ b/master/internal/rm/kubernetesrm/jobs_test.go @@ -3,6 +3,7 @@ package kubernetesrm import ( + "context" "fmt" "os" "testing" @@ -10,7 +11,9 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" k8sV1 "k8s.io/api/core/v1" + k8error "k8s.io/apimachinery/pkg/api/errors" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" typedV1 "k8s.io/client-go/kubernetes/typed/core/v1" _ "k8s.io/client-go/plugin/pkg/client/auth" @@ -60,24 +63,180 @@ func TestGetNonDetPods(t *testing.T) { }, } - ns1 := &mocks.PodInterface{} - ns1.On("List", mock.Anything, mock.Anything).Once(). - Return(&k8sV1.PodList{Items: append(hiddenPods, expectedPods[0])}, nil) + emptyNS := &mocks.PodInterface{} + emptyNS.On("List", mock.Anything, mock.Anything).Once(). + Return(&k8sV1.PodList{Items: append(hiddenPods, expectedPods[0], expectedPods[1])}, nil) + + js := jobsService{ + podInterfaces: map[string]typedV1.PodInterface{ + "ns1": &mocks.PodInterface{}, + "ns2": &mocks.PodInterface{}, + "": emptyNS, + }, + } + + actualPods, err := js.getNonDetPods() + require.NoError(t, err) + require.ElementsMatch(t, expectedPods, actualPods) +} + +func TestListPodsInAllNamespaces(t *testing.T) { + detPods := []k8sV1.Pod{ + { + ObjectMeta: metaV1.ObjectMeta{ + Name: "ns1", + }, + Spec: k8sV1.PodSpec{ + NodeName: "a", + }, + }, + { + ObjectMeta: metaV1.ObjectMeta{ + Name: "ns2", + }, + Spec: k8sV1.PodSpec{ + NodeName: "a", + }, + }, + } + ns1 := &mocks.PodInterface{} ns2 := &mocks.PodInterface{} - ns2.On("List", mock.Anything, mock.Anything).Once(). - Return(&k8sV1.PodList{Items: append(hiddenPods, expectedPods[1])}, nil) + + emptyNS := &mocks.PodInterface{} js := jobsService{ podInterfaces: map[string]typedV1.PodInterface{ "ns1": ns1, "ns2": ns2, + "": emptyNS, }, } - actualPods, err := js.getNonDetPods() + // This pod is not part of js.podInterfaces. + outsidePod := k8sV1.Pod{ + ObjectMeta: metaV1.ObjectMeta{ + Name: "ns3", + }, + Spec: k8sV1.PodSpec{ + NodeName: "b", + }, + } + + var expectedPods []k8sV1.Pod + copy(expectedPods, append(detPods, outsidePod)) + expectedPodList := k8sV1.PodList{Items: expectedPods} + emptyNS.On("List", mock.Anything, mock.Anything).Once(). + Return(&k8sV1.PodList{Items: expectedPods}, nil) + + ctx := context.Background() + opts := metaV1.ListOptions{} + actualPodList, err := js.listPodsInAllNamespaces(ctx, opts) require.NoError(t, err) - require.ElementsMatch(t, expectedPods, actualPods) + require.NotNil(t, actualPodList) + require.ElementsMatch(t, expectedPodList.Items, actualPodList.Items) + + forbiddenErr := k8error.NewForbidden(schema.GroupResource{}, "forbidden", + fmt.Errorf("forbidden")) + + emptyNS.On("List", mock.Anything, mock.Anything).Twice(). + Return(nil, forbiddenErr) + + ns1.On("List", mock.Anything, mock.Anything).Twice(). + Return(&k8sV1.PodList{Items: []k8sV1.Pod{detPods[0]}}, nil) + + ns2.On("List", mock.Anything, mock.Anything).Once(). + Return(&k8sV1.PodList{Items: []k8sV1.Pod{detPods[1]}}, nil) + + actualPodList, err = js.listPodsInAllNamespaces(ctx, opts) + require.NoError(t, err) + require.NotNil(t, actualPodList) + require.ElementsMatch(t, detPods, actualPodList.Items) + + listErr := fmt.Errorf("something bad happened") + ns2.On("List", mock.Anything, mock.Anything).Once(). + Return(nil, listErr) + actualPodList, err = js.listPodsInAllNamespaces(ctx, opts) + require.ErrorIs(t, err, listErr) + require.Nil(t, actualPodList) +} + +func TestHealthStatus(t *testing.T) { + detPods := []k8sV1.Pod{ + { + ObjectMeta: metaV1.ObjectMeta{ + Name: "ns1", + }, + Spec: k8sV1.PodSpec{ + NodeName: "a", + }, + }, + { + ObjectMeta: metaV1.ObjectMeta{ + Name: "ns2", + }, + Spec: k8sV1.PodSpec{ + NodeName: "a", + }, + }, + } + + ns1 := &mocks.PodInterface{} + ns2 := &mocks.PodInterface{} + + emptyNS := &mocks.PodInterface{} + + js := jobsService{ + podInterfaces: map[string]typedV1.PodInterface{ + "ns1": ns1, + "ns2": ns2, + "": emptyNS, + }, + } + + // This pod is not part of js.podInterfaces. + outsidePod := k8sV1.Pod{ + ObjectMeta: metaV1.ObjectMeta{ + Name: "ns3", + }, + Spec: k8sV1.PodSpec{ + NodeName: "b", + }, + } + + var expectedPods []k8sV1.Pod + copy(expectedPods, append(detPods, outsidePod)) + emptyNS.On("List", mock.Anything, mock.Anything).Once(). + Return(&k8sV1.PodList{Items: expectedPods}, nil) + + health := js.HealthStatus() + require.Equal(t, model.Healthy, health) + + emptyNS.On("List", mock.Anything, mock.Anything).Once(). + Return(nil, fmt.Errorf("couldnt list all pods")) + + health = js.HealthStatus() + require.Equal(t, model.Unhealthy, health) + + forbiddenErr := k8error.NewForbidden(schema.GroupResource{}, "forbidden", + fmt.Errorf("forbidden")) + + emptyNS.On("List", mock.Anything, mock.Anything).Twice(). + Return(nil, forbiddenErr) + + ns1.On("List", mock.Anything, mock.Anything).Twice(). + Return(&k8sV1.PodList{Items: []k8sV1.Pod{detPods[0]}}, nil) + + ns2.On("List", mock.Anything, mock.Anything).Once(). + Return(&k8sV1.PodList{Items: []k8sV1.Pod{detPods[1]}}, nil) + + health = js.HealthStatus() + require.Equal(t, model.Healthy, health) + + ns2.On("List", mock.Anything, mock.Anything).Once(). + Return(nil, fmt.Errorf("couldnt list pods in namespace ns2")) + health = js.HealthStatus() + require.Equal(t, model.Unhealthy, health) } func TestJobScheduledStatus(t *testing.T) { diff --git a/master/internal/rm/kubernetesrm/kubernetes_resource_manager_intg_test.go b/master/internal/rm/kubernetesrm/kubernetes_resource_manager_intg_test.go index 3059e765722..9803b982a1d 100644 --- a/master/internal/rm/kubernetesrm/kubernetes_resource_manager_intg_test.go +++ b/master/internal/rm/kubernetesrm/kubernetes_resource_manager_intg_test.go @@ -1609,6 +1609,10 @@ func createMockJobsService(nodes map[string]*k8sV1.Node, devSlotType device.Type jobsClientSet.On("CoreV1").Return(coreV1Interface) jobsClientSet.On("BatchV1").Return(batchV1Interface) + emptyNS := &mocks.PodInterface{} + emptyNS.On("List", mock.Anything, mock.Anything).Return(&podsList, nil) + + podInterfaces := map[string]typedV1.PodInterface{"": emptyNS} return &jobsService{ namespace: "default", clusterName: "", @@ -1623,5 +1627,6 @@ func createMockJobsService(nodes map[string]*k8sV1.Node, devSlotType device.Type slotResourceRequests: config.PodSlotResourceRequests{CPU: 2}, clientSet: jobsClientSet, namespacesWithInformers: make(map[string]bool), + podInterfaces: podInterfaces, } } From 119f2831cbb9f959caca3848ace1b30541b62845 Mon Sep 17 00:00:00 2001 From: Amanda Vialva Date: Wed, 6 Nov 2024 13:36:07 -0500 Subject: [PATCH 2/2] change burst and qps --- master/internal/rm/kubernetesrm/jobs.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/master/internal/rm/kubernetesrm/jobs.go b/master/internal/rm/kubernetesrm/jobs.go index 5dbad7cb86e..ac1431f1576 100644 --- a/master/internal/rm/kubernetesrm/jobs.go +++ b/master/internal/rm/kubernetesrm/jobs.go @@ -398,6 +398,12 @@ func readClientConfig(kubeconfigPath string) (*rest.Config, error) { // and it expects to find files: // - /var/run/secrets/kubernetes.io/serviceaccount/token // - /var/run/secrets/kubernetes.io/serviceaccount/ca.crt + c, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + c.QPS = 100 + c.Burst = 100 return rest.InClusterConfig() } @@ -1088,20 +1094,18 @@ func (j *jobsService) GetSlot(msg *apiv1.GetSlotRequest) *apiv1.GetSlotResponse func (j *jobsService) healthStatusFallback(ctx context.Context) model.HealthStatus { g := errgroup.Group{} - cnt := 0 for n, podInterface := range j.podInterfaces { if len(n) == 0 { continue } g.Go(func() error { - time.Sleep(time.Duration(cnt/DefaultClientBurst) * 1050 * time.Millisecond) + time.Sleep(time.Duration(200 * time.Millisecond)) _, err := podInterface.List(ctx, metaV1.ListOptions{Limit: 1}) if err != nil { return err } return nil }) - cnt++ } err := g.Wait() if err != nil { @@ -2149,13 +2153,13 @@ func (j *jobsService) listImportantPods(ctx context.Context, opts metaV1.ListOpt resLock := sync.Mutex{} res := &k8sV1.PodList{} g := errgroup.Group{} - cnt := 0 + // cnt := 0 for n, podInterface := range j.podInterfaces { if len(n) == 0 { continue } g.Go(func() error { - time.Sleep(time.Duration(cnt/DefaultClientBurst) * 1050 * time.Millisecond) + // time.Sleep(time.Duration(cnt/DefaultClientBurst) * 1050 * time.Millisecond) pods, err := podInterface.List(ctx, opts) if err != nil { return fmt.Errorf("error listing pods for namespace %s: %w", n, err) @@ -2165,7 +2169,7 @@ func (j *jobsService) listImportantPods(ctx context.Context, opts metaV1.ListOpt resLock.Unlock() return nil }) - cnt++ + // cnt++ } err := g.Wait() if err != nil {