Skip to content

Commit

Permalink
fix: port k8s perf fix (#10213)
Browse files Browse the repository at this point in the history
Co-authored-by: Bradley Laney <[email protected]>
Co-authored-by: Amanda Vialva <[email protected]>
(cherry picked from commit 50a28cd)
  • Loading branch information
maxrussell authored and github-actions[bot] committed Nov 20, 2024
1 parent 0cc57df commit 542d39f
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 48 deletions.
175 changes: 136 additions & 39 deletions master/internal/rm/kubernetesrm/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
batchV1 "k8s.io/api/batch/v1"
k8sV1 "k8s.io/api/core/v1"
k8error "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -37,6 +38,9 @@ import (
gateway "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned/typed/apis/v1"
alphaGateway "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned/typed/apis/v1alpha2"

// Used to load all auth plugins.
_ "k8s.io/client-go/plugin/pkg/client/auth"

"github.com/determined-ai/determined/master/internal/config"
"github.com/determined-ai/determined/master/internal/db"
"github.com/determined-ai/determined/master/internal/rm"
Expand All @@ -51,9 +55,6 @@ import (
"github.com/determined-ai/determined/master/pkg/syncx/waitgroupx"
"github.com/determined-ai/determined/master/pkg/tasks"
"github.com/determined-ai/determined/proto/pkg/apiv1"

// Used to load all auth plugins.
_ "k8s.io/client-go/plugin/pkg/client/auth"
)

const (
Expand Down Expand Up @@ -116,13 +117,15 @@ type jobsService struct {
internalTaskGWConfig *config.InternalTaskGatewayConfig

// System dependencies. Also set in initialization and never modified after.
syslog *logrus.Entry
clientSet k8sClient.Interface
syslog *logrus.Entry
clientSet k8sClient.Interface
// TODO(!!!): Not set in initialization and never changed anymore.. RIP.
podInterfaces map[string]typedV1.PodInterface
configMapInterfaces map[string]typedV1.ConfigMapInterface
jobInterfaces map[string]typedBatchV1.JobInterface
serviceInterfaces map[string]typedV1.ServiceInterface
tcpRouteInterfaces map[string]alphaGateway.TCPRouteInterface
// TODO(!!!): end.

resourceRequestQueue *requestQueue
requestQueueWorkers []*requestProcessingWorker
Expand Down Expand Up @@ -256,6 +259,7 @@ func newJobsService(
}

func (j *jobsService) syncNamespaces(ns []string, hasJSLock bool) error {
// TODO(!!!): Prob one informer per cluster too.
for _, namespace := range ns {
// Since we don't want to do duplicate namespace informers, don't start any
// listeners or informers that have already been added to namespacesWithInformers.
Expand Down Expand Up @@ -351,6 +355,8 @@ func (j *jobsService) startClientSet(namespaces []string) error {
return fmt.Errorf("failed to initialize kubernetes clientSet: %w", err)
}

j.jobInterfaces[""] = j.clientSet.BatchV1().Jobs("")
j.podInterfaces[""] = j.clientSet.CoreV1().Pods("")
for _, ns := range namespaces {
j.podInterfaces[ns] = j.clientSet.CoreV1().Pods(ns)
j.configMapInterfaces[ns] = j.clientSet.CoreV1().ConfigMaps(ns)
Expand Down Expand Up @@ -397,7 +403,17 @@ func readClientConfig(kubeconfigPath string) (*rest.Config, error) {
// and it expects to find files:
// - /var/run/secrets/kubernetes.io/serviceaccount/token
// - /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
return rest.InClusterConfig()
c, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
if c.QPS == 0.0 {
c.QPS = 20
}
if c.Burst == 0 {
c.Burst = 100
}
return c, nil
}

if parts := strings.Split(kubeconfigPath, string(os.PathSeparator)); parts[0] == "~" {
Expand Down Expand Up @@ -1040,7 +1056,7 @@ func (j *jobsService) refreshPodStates(allocationID model.AllocationID) error {
return fmt.Errorf("failed to get namespaces for resource manager: %w", err)
}

for _, pod := range pods.Items {
for _, pod := range pods {
if !slices.Contains(ns, pod.Namespace) {
continue
}
Expand Down Expand Up @@ -1085,20 +1101,40 @@ func (j *jobsService) GetSlot(msg *apiv1.GetSlotRequest) *apiv1.GetSlotResponse
return j.getSlot(msg.AgentId, msg.SlotId)
}

func (j *jobsService) HealthStatus() model.HealthStatus {
j.mu.Lock()
defer j.mu.Unlock()
for _, podInterface := range j.podInterfaces {
_, err := podInterface.List(context.TODO(), metaV1.ListOptions{Limit: 1})
if err != nil {
j.syslog.WithError(err).Error("kubernetes resource manager marked as unhealthy")
return model.Unhealthy
}
return model.Healthy
func (j *jobsService) HealthStatus(ctx context.Context) model.HealthStatus {
if len(j.podInterfaces) == 0 {
logrus.Error("expected podInterface to be non empty")
return model.Unhealthy
}

logrus.Error("expected jobInterface to be non empty")
return model.Unhealthy
_, err := j.podInterfaces[""].List(ctx, metaV1.ListOptions{Limit: 1})
if k8error.IsForbidden(err) {
return j.healthStatusFallback(ctx)
} else if err != nil {
return model.Unhealthy
}
return model.Healthy
}

func (j *jobsService) healthStatusFallback(ctx context.Context) model.HealthStatus {
var g errgroup.Group
for n, podInterface := range j.podInterfaces {
if len(n) == 0 { // TODO: We store a non-namespaced client with key "".
continue
}
g.Go(func() error {
_, err := podInterface.List(ctx, metaV1.ListOptions{Limit: 1})
if err != nil {
return err
}
return nil
})
}
err := g.Wait()
if err != nil {
return model.Unhealthy
}
return model.Healthy
}

func (j *jobsService) startNodeInformer() error {
Expand Down Expand Up @@ -1493,7 +1529,7 @@ func (j *jobsService) releaseAllocationsOnDisabledNode(nodeName string) error {
}

notifiedAllocations := make(map[model.AllocationID]bool)
for _, pod := range pods.Items {
for _, pod := range pods {
jobName, ok := resolvePodJobName(&pod)
if !ok {
j.syslog.Debugf("found pod when disabling node without %s label", kubernetesJobNameLabel)
Expand Down Expand Up @@ -1586,7 +1622,6 @@ type computeUsageSummary struct {
slotsAvailable int
}

// TODO(!!!): good func comment.
func (j *jobsService) summarizeComputeUsage(poolName string) (*computeUsageSummary, error) {
summary, err := j.summarize()
if err != nil {
Expand Down Expand Up @@ -1926,7 +1961,6 @@ func (j *jobsService) summarizeClusterByNodes() map[string]model.AgentSummary {
}
podByNode[podInfo.nodeName] = append(podByNode[podInfo.nodeName], podInfo)
}

nodeToTasks, taskSlots := j.getNonDetSlots(j.slotType)
summary := make(map[string]model.AgentSummary, len(j.currentNodes))
for _, node := range j.currentNodes {
Expand Down Expand Up @@ -2042,7 +2076,7 @@ func (j *jobsService) getNonDetPods() ([]k8sV1.Pod, error) {
}

var nonDetPods []k8sV1.Pod
for _, p := range allPods.Items {
for _, p := range allPods {
_, isDet := p.Labels[determinedLabel]
_, isDetSystem := p.Labels[determinedSystemLabel]

Expand All @@ -2058,7 +2092,6 @@ func (j *jobsService) getNonDetPods() ([]k8sV1.Pod, error) {
func (j *jobsService) getNonDetSlots(deviceType device.Type) (map[string][]string, map[string]int64) {
nodeToTasks := make(map[string][]string, len(j.currentNodes))
taskSlots := make(map[string]int64)

nonDetPods, err := j.getNonDetPods()
if err != nil {
j.syslog.WithError(err).Warn("getting non determined pods, " +
Expand Down Expand Up @@ -2134,32 +2167,96 @@ func numSlots(slots model.SlotsSummary) int {
func (j *jobsService) listJobsInAllNamespaces(
ctx context.Context, opts metaV1.ListOptions,
) ([]batchV1.Job, error) {
var res []batchV1.Job
for n, i := range j.jobInterfaces {
pods, err := i.List(ctx, opts)
if err != nil {
return nil, fmt.Errorf("error listing pods for namespace %s: %w", n, err)
}
allJobs, err := j.jobInterfaces[""].List(ctx, opts)
if k8error.IsForbidden(err) {
return j.listJobsInAllNamespacesFallback(ctx, opts)
} else if err != nil {
logrus.WithError(err).WithField("function", "listJobsInAllNamespaces").Error("error listing jobs in all namespace")
return nil, err
}

res = append(res, pods.Items...)
namespaces := set.FromKeys(j.jobInterfaces)
var jobsWeCareAbout []batchV1.Job
for _, j := range allJobs.Items {
if namespaces.Contains(j.Namespace) {
jobsWeCareAbout = append(jobsWeCareAbout, j)
}
}
return jobsWeCareAbout, nil
}

func (j *jobsService) listJobsInAllNamespacesFallback(
ctx context.Context,
opts metaV1.ListOptions,
) ([]batchV1.Job, error) {
var g errgroup.Group
var res []batchV1.Job
var resLock sync.Mutex
for n, i := range j.jobInterfaces {
g.Go(func() error {
pods, err := i.List(ctx, opts)
if err != nil {
return fmt.Errorf("error listing pods for namespace %s: %w", n, err)
}
resLock.Lock()
res = append(res, pods.Items...)
resLock.Unlock()
return nil
})
}
err := g.Wait()
if err != nil {
return nil, err
}
return res, nil
}

func (j *jobsService) listPodsInAllNamespaces(
ctx context.Context, opts metaV1.ListOptions,
) (*k8sV1.PodList, error) {
res := &k8sV1.PodList{}
for n, i := range j.podInterfaces {
pods, err := i.List(ctx, opts)
if err != nil {
return nil, fmt.Errorf("error listing pods for namespace %s: %w", n, err)
}
) ([]k8sV1.Pod, error) {
allPods, err := j.podInterfaces[""].List(ctx, opts)
if k8error.IsForbidden(err) {
return j.listPodsInAllNamespacesFallback(ctx, opts)
} else if err != nil {
return nil, err
}

res.Items = append(res.Items, pods.Items...)
namespaces := set.FromKeys(j.podInterfaces)
var podsWeWant []k8sV1.Pod
for _, pod := range allPods.Items {
if namespaces.Contains(pod.Namespace) {
podsWeWant = append(podsWeWant, pod)
}
}
return podsWeWant, nil
}

func (j *jobsService) listPodsInAllNamespacesFallback(
ctx context.Context,
opts metaV1.ListOptions,
) ([]k8sV1.Pod, error) {
var g errgroup.Group
var res []k8sV1.Pod
var resLock sync.Mutex
for n, podInterface := range j.podInterfaces {
if len(n) == 0 {
continue
}
g.Go(func() error {
pods, err := podInterface.List(ctx, opts)
if err != nil {
return fmt.Errorf("error listing pods for namespace %s: %w", n, err)
}
resLock.Lock()
res = append(res, pods.Items...)
resLock.Unlock()
return nil
})
}
err := g.Wait()
if err != nil {
return nil, err
}
return res, nil
}

Expand Down
Loading

0 comments on commit 542d39f

Please sign in to comment.