Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Commit

Permalink
improvement: Refactor server initialization to expose a plugin point …
Browse files Browse the repository at this point in the history
…to end to end tests (#253)
  • Loading branch information
k-simons authored Apr 12, 2023
1 parent a7122ad commit 9f256fb
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 43 deletions.
76 changes: 76 additions & 0 deletions cmd/clients.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2019 Palantir Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"context"

clientset "github.com/palantir/k8s-spark-scheduler-lib/pkg/client/clientset/versioned"
"github.com/palantir/k8s-spark-scheduler/config"
"github.com/palantir/witchcraft-go-logging/wlog/svclog/svc1log"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

// AllClient holds all the k8s clients used
type AllClient struct {
APIExtensionsClient apiextensionsclientset.Interface
SparkSchedulerClient clientset.Interface
KubeClient kubernetes.Interface
}

// GetClients creates AllClient given the passed in install config
func GetClients(ctx context.Context, install config.Install) (AllClient, error) {
var kubeconfig *rest.Config
var err error
if install.Kubeconfig != "" {
kubeconfig, err = clientcmd.BuildConfigFromFlags("", install.Kubeconfig)
if err != nil {
svc1log.FromContext(ctx).Error("Error building config from kubeconfig: %s", svc1log.Stacktrace(err))
return AllClient{}, err
}
} else {
kubeconfig, err = rest.InClusterConfig()
if err != nil {
svc1log.FromContext(ctx).Error("Error building in cluster kubeconfig: %s", svc1log.Stacktrace(err))
return AllClient{}, err
}
}
kubeconfig.QPS = install.QPS
kubeconfig.Burst = install.Burst

kubeClient, err := kubernetes.NewForConfig(kubeconfig)
if err != nil {
svc1log.FromContext(ctx).Error("Error building kubernetes clientset: %s", svc1log.Stacktrace(err))
return AllClient{}, err
}
sparkSchedulerClient, err := clientset.NewForConfig(kubeconfig)
if err != nil {
svc1log.FromContext(ctx).Error("Error building spark scheduler clientset: %s", svc1log.Stacktrace(err))
return AllClient{}, err
}
apiExtensionsClient, err := apiextensionsclientset.NewForConfig(kubeconfig)
if err != nil {
svc1log.FromContext(ctx).Error("Error building api extensions clientset: %s", svc1log.Stacktrace(err))
return AllClient{}, err
}
return AllClient{
APIExtensionsClient: apiExtensionsClient,
SparkSchedulerClient: sparkSchedulerClient,
KubeClient: kubeClient,
}, nil
}
64 changes: 21 additions & 43 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/palantir/k8s-spark-scheduler-lib/pkg/apis/sparkscheduler/v1beta1"
"github.com/palantir/k8s-spark-scheduler-lib/pkg/apis/sparkscheduler/v1beta2"
clientset "github.com/palantir/k8s-spark-scheduler-lib/pkg/client/clientset/versioned"
ssinformers "github.com/palantir/k8s-spark-scheduler-lib/pkg/client/informers/externalversions"
"github.com/palantir/k8s-spark-scheduler/config"
"github.com/palantir/k8s-spark-scheduler/internal/cache"
Expand All @@ -29,16 +28,13 @@ import (
"github.com/palantir/k8s-spark-scheduler/internal/extender"
"github.com/palantir/k8s-spark-scheduler/internal/metrics"
"github.com/palantir/k8s-spark-scheduler/internal/sort"
werror "github.com/palantir/witchcraft-go-error"
"github.com/palantir/witchcraft-go-logging/wlog/svclog/svc1log"
"github.com/palantir/witchcraft-go-logging/wlog/wapp"
"github.com/palantir/witchcraft-go-server/witchcraft"
"github.com/spf13/cobra"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
clientcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)

var serverCmd = &cobra.Command{
Expand All @@ -54,58 +50,40 @@ func init() {
}

func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) {
var kubeconfig *rest.Config
var err error

install := info.InstallConfig.(config.Install)
if install.Kubeconfig != "" {
kubeconfig, err = clientcmd.BuildConfigFromFlags("", install.Kubeconfig)
if err != nil {
svc1log.FromContext(ctx).Error("Error building config from kubeconfig: %s", svc1log.Stacktrace(err))
return nil, err
}
} else {
kubeconfig, err = rest.InClusterConfig()
if err != nil {
svc1log.FromContext(ctx).Error("Error building in cluster kubeconfig: %s", svc1log.Stacktrace(err))
return nil, err
}
allClient, err := GetClients(ctx, install)
if err != nil {
return nil, err
}
kubeconfig.QPS = install.QPS
kubeconfig.Burst = install.Burst
err = InitServerWithClients(ctx, info, allClient)
return nil, err
}

// InitServerWithClients is exported for end to end testing
func InitServerWithClients(ctx context.Context, info witchcraft.InitInfo, allClient AllClient) error {
install := info.InstallConfig.(config.Install)
instanceGroupLabel := install.InstanceGroupLabel
if instanceGroupLabel == "" {
// for back-compat, as instanceGroupLabel was once hard-coded to this value
instanceGroupLabel = "resource_channel"
}

kubeClient, err := kubernetes.NewForConfig(kubeconfig)
if err != nil {
svc1log.FromContext(ctx).Error("Error building kubernetes clientset: %s", svc1log.Stacktrace(err))
return nil, err
}
sparkSchedulerClient, err := clientset.NewForConfig(kubeconfig)
if err != nil {
svc1log.FromContext(ctx).Error("Error building spark scheduler clientset: %s", svc1log.Stacktrace(err))
return nil, err
}
apiExtensionsClient, err := apiextensionsclientset.NewForConfig(kubeconfig)
if err != nil {
svc1log.FromContext(ctx).Error("Error building api extensions clientset: %s", svc1log.Stacktrace(err))
return nil, err
}
apiExtensionsClient := allClient.APIExtensionsClient
sparkSchedulerClient := allClient.SparkSchedulerClient
kubeClient := allClient.KubeClient

webhookClientConfig, err := conversionwebhook.InitializeCRDConversionWebhook(ctx, info.Router, install.Server,
install.WebhookServiceConfig.Namespace, install.WebhookServiceConfig.ServiceName, install.WebhookServiceConfig.ServicePort)
if err != nil {
svc1log.FromContext(ctx).Error("Error instantiating CRD conversion webhook: %s", svc1log.Stacktrace(err))
return nil, err
return err
}
err = crd.EnsureResourceReservationsCRD(ctx, apiExtensionsClient, install.ResourceReservationCRDAnnotations,
v1beta2.ResourceReservationCustomResourceDefinition(webhookClientConfig, v1beta1.ResourceReservationCustomResourceDefinitionVersion()),
)
if err != nil {
svc1log.FromContext(ctx).Error("Error ensuring resource reservations v1beta2 CRD exists: %s", svc1log.Stacktrace(err))
return nil, err
return err
}

kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Second*30)
Expand Down Expand Up @@ -143,7 +121,7 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) {
podInformer.HasSynced,
resourceReservationInformer.HasSynced); !ok {
svc1log.FromContext(ctx).Error("Error waiting for cache to sync")
return nil, nil
return werror.ErrorWithContextParams(ctx, "could not sync")
}

resourceReservationCache, err := cache.NewResourceReservationCache(
Expand All @@ -155,7 +133,7 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) {

if err != nil {
svc1log.FromContext(ctx).Error("Error constructing resource reservation cache", svc1log.Stacktrace(err))
return nil, err
return err
}

lazyDemandInformer := crd.NewLazyDemandInformer(
Expand Down Expand Up @@ -249,10 +227,10 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) {
go unschedulablePodMarker.Start(ctx)

if err := registerExtenderEndpoints(info.Router, sparkSchedulerExtender); err != nil {
return nil, err
return err
}

return nil, nil
return nil
}

// New creates and returns a witchcraft Server.
Expand Down

0 comments on commit 9f256fb

Please sign in to comment.