Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: performance errors when listing pods #10199

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 87 additions & 16 deletions master/internal/rm/kubernetesrm/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
batchV1 "k8s.io/api/batch/v1"
k8sV1 "k8s.io/api/core/v1"
k8error "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -71,6 +72,8 @@ const (
ReleaseNamespaceEnvVar = "DET_RELEASE_NAMESPACE"
// ResourceTypeNvidia describes the GPU resource type.
ResourceTypeNvidia = "nvidia.com/gpu"
// DefaultClientBurst is the default Kubernetes burst limit for the k8s Go client.
DefaultClientBurst = 10
)

var cacheSyncs []cache.InformerSynced
Expand Down Expand Up @@ -348,6 +351,7 @@ func (j *jobsService) startClientSet(namespaces []string) error {
return fmt.Errorf("failed to initialize kubernetes clientSet: %w", err)
}

j.podInterfaces[""] = j.clientSet.CoreV1().Pods("")
for _, ns := range namespaces {
j.podInterfaces[ns] = j.clientSet.CoreV1().Pods(ns)
j.configMapInterfaces[ns] = j.clientSet.CoreV1().ConfigMaps(ns)
Expand Down Expand Up @@ -394,6 +398,12 @@ func readClientConfig(kubeconfigPath string) (*rest.Config, error) {
// and it expects to find files:
// - /var/run/secrets/kubernetes.io/serviceaccount/token
// - /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
c, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
c.QPS = 100
c.Burst = 100
return rest.InClusterConfig()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this return c, err?

Copy link
Contributor Author

@amandavialva01 amandavialva01 Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was ruled out as a viable solution before, I was testing solutions for a working fallback on a diff branch and I think i accidentally pushed this change here, I'll get rid of this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but yea it woudlve had to be return c, err

}

Expand Down Expand Up @@ -1082,20 +1092,45 @@ func (j *jobsService) GetSlot(msg *apiv1.GetSlotRequest) *apiv1.GetSlotResponse
return j.getSlot(msg.AgentId, msg.SlotId)
}

func (j *jobsService) healthStatusFallback(ctx context.Context) model.HealthStatus {
g := errgroup.Group{}
for n, podInterface := range j.podInterfaces {
if len(n) == 0 {
continue
}
g.Go(func() error {
time.Sleep(time.Duration(200 * time.Millisecond))
_, err := podInterface.List(ctx, metaV1.ListOptions{Limit: 1})
if err != nil {
return err
}
return nil
})
}
err := g.Wait()
if err != nil {
return model.Unhealthy
}
return model.Healthy
}

func (j *jobsService) HealthStatus() model.HealthStatus {
j.mu.Lock()
defer j.mu.Unlock()
for _, podInterface := range j.podInterfaces {
_, err := podInterface.List(context.TODO(), metaV1.ListOptions{Limit: 1})
if err != nil {
j.syslog.WithError(err).Error("kubernetes resource manager marked as unhealthy")
return model.Unhealthy
}
return model.Healthy
ctx := context.TODO()
if len(j.podInterfaces) == 0 {
logrus.Error("expected podInterface to be non empty")
return model.Unhealthy
}

logrus.Error("expected jobInterface to be non empty")
return model.Unhealthy
_, err := j.podInterfaces[""].List(ctx, metaV1.ListOptions{Limit: 1})
if err != nil {
if k8error.IsForbidden(err) {
return j.healthStatusFallback(ctx)
}
return model.Unhealthy
}
return model.Healthy
}

func (j *jobsService) startNodeInformer() error {
Expand Down Expand Up @@ -2114,20 +2149,56 @@ func (j *jobsService) listJobsInAllNamespaces(
return res, nil
}

func (j *jobsService) listImportantPods(ctx context.Context, opts metaV1.ListOptions) (*k8sV1.PodList, error) {
resLock := sync.Mutex{}
res := &k8sV1.PodList{}
g := errgroup.Group{}
// cnt := 0
for n, podInterface := range j.podInterfaces {
if len(n) == 0 {
continue
}
g.Go(func() error {
// time.Sleep(time.Duration(cnt/DefaultClientBurst) * 1050 * time.Millisecond)
pods, err := podInterface.List(ctx, opts)
if err != nil {
return fmt.Errorf("error listing pods for namespace %s: %w", n, err)
}
resLock.Lock()
res.Items = append(res.Items, pods.Items...)
resLock.Unlock()
return nil
})
// cnt++
}
err := g.Wait()
if err != nil {
return nil, err
}
return res, nil
}

func (j *jobsService) listPodsInAllNamespaces(
ctx context.Context, opts metaV1.ListOptions,
) (*k8sV1.PodList, error) {
res := &k8sV1.PodList{}
for n, i := range j.podInterfaces {
pods, err := i.List(ctx, opts)
if err != nil {
return nil, fmt.Errorf("error listing pods for namespace %s: %w", n, err)
allPods, err := j.podInterfaces[""].List(ctx, opts)
if err != nil {
if k8error.IsForbidden(err) {
return j.listImportantPods(ctx, opts)
}
return nil, err
}

res.Items = append(res.Items, pods.Items...)
var podsWeWant k8sV1.PodList
namespaces := set.FromKeys(j.podInterfaces)
for _, pod := range allPods.Items {
if namespaces.Contains(pod.Namespace) {
podsWeWant.Items = append(podsWeWant.Items, pod)
}
}

return res, nil
allPods.Items, podsWeWant.Items = podsWeWant.Items, allPods.Items
return allPods, nil
}

func (j *jobsService) listConfigMapsInAllNamespaces(
Expand Down
173 changes: 166 additions & 7 deletions master/internal/rm/kubernetesrm/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
package kubernetesrm

import (
"context"
"fmt"
"os"
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
k8sV1 "k8s.io/api/core/v1"
k8error "k8s.io/apimachinery/pkg/api/errors"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
typedV1 "k8s.io/client-go/kubernetes/typed/core/v1"
_ "k8s.io/client-go/plugin/pkg/client/auth"

Expand Down Expand Up @@ -60,24 +63,180 @@ func TestGetNonDetPods(t *testing.T) {
},
}

ns1 := &mocks.PodInterface{}
ns1.On("List", mock.Anything, mock.Anything).Once().
Return(&k8sV1.PodList{Items: append(hiddenPods, expectedPods[0])}, nil)
emptyNS := &mocks.PodInterface{}
emptyNS.On("List", mock.Anything, mock.Anything).Once().
Return(&k8sV1.PodList{Items: append(hiddenPods, expectedPods[0], expectedPods[1])}, nil)

js := jobsService{
podInterfaces: map[string]typedV1.PodInterface{
"ns1": &mocks.PodInterface{},
"ns2": &mocks.PodInterface{},
"": emptyNS,
},
}

actualPods, err := js.getNonDetPods()
require.NoError(t, err)
require.ElementsMatch(t, expectedPods, actualPods)
}

func TestListPodsInAllNamespaces(t *testing.T) {
detPods := []k8sV1.Pod{
{
ObjectMeta: metaV1.ObjectMeta{
Name: "ns1",
},
Spec: k8sV1.PodSpec{
NodeName: "a",
},
},
{
ObjectMeta: metaV1.ObjectMeta{
Name: "ns2",
},
Spec: k8sV1.PodSpec{
NodeName: "a",
},
},
}

ns1 := &mocks.PodInterface{}
ns2 := &mocks.PodInterface{}
ns2.On("List", mock.Anything, mock.Anything).Once().
Return(&k8sV1.PodList{Items: append(hiddenPods, expectedPods[1])}, nil)

emptyNS := &mocks.PodInterface{}

js := jobsService{
podInterfaces: map[string]typedV1.PodInterface{
"ns1": ns1,
"ns2": ns2,
"": emptyNS,
},
}

actualPods, err := js.getNonDetPods()
// This pod is not part of js.podInterfaces.
outsidePod := k8sV1.Pod{
ObjectMeta: metaV1.ObjectMeta{
Name: "ns3",
},
Spec: k8sV1.PodSpec{
NodeName: "b",
},
}

var expectedPods []k8sV1.Pod
copy(expectedPods, append(detPods, outsidePod))
expectedPodList := k8sV1.PodList{Items: expectedPods}
emptyNS.On("List", mock.Anything, mock.Anything).Once().
Return(&k8sV1.PodList{Items: expectedPods}, nil)

ctx := context.Background()
opts := metaV1.ListOptions{}
actualPodList, err := js.listPodsInAllNamespaces(ctx, opts)
require.NoError(t, err)
require.ElementsMatch(t, expectedPods, actualPods)
require.NotNil(t, actualPodList)
require.ElementsMatch(t, expectedPodList.Items, actualPodList.Items)

forbiddenErr := k8error.NewForbidden(schema.GroupResource{}, "forbidden",
fmt.Errorf("forbidden"))

emptyNS.On("List", mock.Anything, mock.Anything).Twice().
Return(nil, forbiddenErr)

ns1.On("List", mock.Anything, mock.Anything).Twice().
Return(&k8sV1.PodList{Items: []k8sV1.Pod{detPods[0]}}, nil)

ns2.On("List", mock.Anything, mock.Anything).Once().
Return(&k8sV1.PodList{Items: []k8sV1.Pod{detPods[1]}}, nil)

actualPodList, err = js.listPodsInAllNamespaces(ctx, opts)
require.NoError(t, err)
require.NotNil(t, actualPodList)
require.ElementsMatch(t, detPods, actualPodList.Items)

listErr := fmt.Errorf("something bad happened")
ns2.On("List", mock.Anything, mock.Anything).Once().
Return(nil, listErr)
actualPodList, err = js.listPodsInAllNamespaces(ctx, opts)
require.ErrorIs(t, err, listErr)
require.Nil(t, actualPodList)
}

func TestHealthStatus(t *testing.T) {
detPods := []k8sV1.Pod{
{
ObjectMeta: metaV1.ObjectMeta{
Name: "ns1",
},
Spec: k8sV1.PodSpec{
NodeName: "a",
},
},
{
ObjectMeta: metaV1.ObjectMeta{
Name: "ns2",
},
Spec: k8sV1.PodSpec{
NodeName: "a",
},
},
}

ns1 := &mocks.PodInterface{}
ns2 := &mocks.PodInterface{}

emptyNS := &mocks.PodInterface{}

js := jobsService{
podInterfaces: map[string]typedV1.PodInterface{
"ns1": ns1,
"ns2": ns2,
"": emptyNS,
},
}

// This pod is not part of js.podInterfaces.
outsidePod := k8sV1.Pod{
ObjectMeta: metaV1.ObjectMeta{
Name: "ns3",
},
Spec: k8sV1.PodSpec{
NodeName: "b",
},
}

var expectedPods []k8sV1.Pod
copy(expectedPods, append(detPods, outsidePod))
emptyNS.On("List", mock.Anything, mock.Anything).Once().
Return(&k8sV1.PodList{Items: expectedPods}, nil)

health := js.HealthStatus()
require.Equal(t, model.Healthy, health)

emptyNS.On("List", mock.Anything, mock.Anything).Once().
Return(nil, fmt.Errorf("couldnt list all pods"))

health = js.HealthStatus()
require.Equal(t, model.Unhealthy, health)

forbiddenErr := k8error.NewForbidden(schema.GroupResource{}, "forbidden",
fmt.Errorf("forbidden"))

emptyNS.On("List", mock.Anything, mock.Anything).Twice().
Return(nil, forbiddenErr)

ns1.On("List", mock.Anything, mock.Anything).Twice().
Return(&k8sV1.PodList{Items: []k8sV1.Pod{detPods[0]}}, nil)

ns2.On("List", mock.Anything, mock.Anything).Once().
Return(&k8sV1.PodList{Items: []k8sV1.Pod{detPods[1]}}, nil)

health = js.HealthStatus()
require.Equal(t, model.Healthy, health)

ns2.On("List", mock.Anything, mock.Anything).Once().
Return(nil, fmt.Errorf("couldnt list pods in namespace ns2"))
health = js.HealthStatus()
require.Equal(t, model.Unhealthy, health)
}

func TestJobScheduledStatus(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,10 @@ func createMockJobsService(nodes map[string]*k8sV1.Node, devSlotType device.Type
jobsClientSet.On("CoreV1").Return(coreV1Interface)
jobsClientSet.On("BatchV1").Return(batchV1Interface)

emptyNS := &mocks.PodInterface{}
emptyNS.On("List", mock.Anything, mock.Anything).Return(&podsList, nil)

podInterfaces := map[string]typedV1.PodInterface{"": emptyNS}
return &jobsService{
namespace: "default",
clusterName: "",
Expand All @@ -1623,5 +1627,6 @@ func createMockJobsService(nodes map[string]*k8sV1.Node, devSlotType device.Type
slotResourceRequests: config.PodSlotResourceRequests{CPU: 2},
clientSet: jobsClientSet,
namespacesWithInformers: make(map[string]bool),
podInterfaces: podInterfaces,
}
}
Loading