diff --git a/cmd/clients.go b/cmd/clients.go new file mode 100644 index 00000000..6508361a --- /dev/null +++ b/cmd/clients.go @@ -0,0 +1,76 @@ +// Copyright (c) 2019 Palantir Technologies. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "context" + + clientset "github.com/palantir/k8s-spark-scheduler-lib/pkg/client/clientset/versioned" + "github.com/palantir/k8s-spark-scheduler/config" + "github.com/palantir/witchcraft-go-logging/wlog/svclog/svc1log" + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +// AllClient holds all the k8s clients used +type AllClient struct { + APIExtensionsClient apiextensionsclientset.Interface + SparkSchedulerClient clientset.Interface + KubeClient kubernetes.Interface +} + +// GetClients creates AllClient given the passed in install config +func GetClients(ctx context.Context, install config.Install) (AllClient, error) { + var kubeconfig *rest.Config + var err error + if install.Kubeconfig != "" { + kubeconfig, err = clientcmd.BuildConfigFromFlags("", install.Kubeconfig) + if err != nil { + svc1log.FromContext(ctx).Error("Error building config from kubeconfig: %s", svc1log.Stacktrace(err)) + return AllClient{}, err + } + } else { + kubeconfig, err = rest.InClusterConfig() + if err != nil { + svc1log.FromContext(ctx).Error("Error building in cluster kubeconfig: %s", svc1log.Stacktrace(err)) + return AllClient{}, err + } + } + kubeconfig.QPS = install.QPS + kubeconfig.Burst = install.Burst + + kubeClient, err := kubernetes.NewForConfig(kubeconfig) + if err != nil { + svc1log.FromContext(ctx).Error("Error building kubernetes clientset: %s", svc1log.Stacktrace(err)) + return AllClient{}, err + } + sparkSchedulerClient, err := clientset.NewForConfig(kubeconfig) + if err != nil { + svc1log.FromContext(ctx).Error("Error building spark scheduler clientset: %s", svc1log.Stacktrace(err)) + return AllClient{}, err + } + apiExtensionsClient, err := apiextensionsclientset.NewForConfig(kubeconfig) + if err != nil { + svc1log.FromContext(ctx).Error("Error building api extensions clientset: %s", svc1log.Stacktrace(err)) + return AllClient{}, err + } + return AllClient{ + APIExtensionsClient: apiExtensionsClient, + SparkSchedulerClient: sparkSchedulerClient, + KubeClient: kubeClient, + }, nil +} diff --git a/cmd/server.go b/cmd/server.go index a5235bbe..161f0735 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -20,7 +20,6 @@ import ( "github.com/palantir/k8s-spark-scheduler-lib/pkg/apis/sparkscheduler/v1beta1" "github.com/palantir/k8s-spark-scheduler-lib/pkg/apis/sparkscheduler/v1beta2" - clientset "github.com/palantir/k8s-spark-scheduler-lib/pkg/client/clientset/versioned" ssinformers "github.com/palantir/k8s-spark-scheduler-lib/pkg/client/informers/externalversions" "github.com/palantir/k8s-spark-scheduler/config" "github.com/palantir/k8s-spark-scheduler/internal/cache" @@ -29,16 +28,13 @@ import ( "github.com/palantir/k8s-spark-scheduler/internal/extender" "github.com/palantir/k8s-spark-scheduler/internal/metrics" "github.com/palantir/k8s-spark-scheduler/internal/sort" + werror "github.com/palantir/witchcraft-go-error" "github.com/palantir/witchcraft-go-logging/wlog/svclog/svc1log" "github.com/palantir/witchcraft-go-logging/wlog/wapp" "github.com/palantir/witchcraft-go-server/witchcraft" "github.com/spf13/cobra" - apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" clientcache "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/clientcmd" ) var serverCmd = &cobra.Command{ @@ -54,58 +50,40 @@ func init() { } func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) { - var kubeconfig *rest.Config - var err error - install := info.InstallConfig.(config.Install) - if install.Kubeconfig != "" { - kubeconfig, err = clientcmd.BuildConfigFromFlags("", install.Kubeconfig) - if err != nil { - svc1log.FromContext(ctx).Error("Error building config from kubeconfig: %s", svc1log.Stacktrace(err)) - return nil, err - } - } else { - kubeconfig, err = rest.InClusterConfig() - if err != nil { - svc1log.FromContext(ctx).Error("Error building in cluster kubeconfig: %s", svc1log.Stacktrace(err)) - return nil, err - } + allClient, err := GetClients(ctx, install) + if err != nil { + return nil, err } - kubeconfig.QPS = install.QPS - kubeconfig.Burst = install.Burst + err = InitServerWithClients(ctx, info, allClient) + return nil, err +} + +// InitServerWithClients is exported for end to end testing +func InitServerWithClients(ctx context.Context, info witchcraft.InitInfo, allClient AllClient) error { + install := info.InstallConfig.(config.Install) instanceGroupLabel := install.InstanceGroupLabel if instanceGroupLabel == "" { // for back-compat, as instanceGroupLabel was once hard-coded to this value instanceGroupLabel = "resource_channel" } - kubeClient, err := kubernetes.NewForConfig(kubeconfig) - if err != nil { - svc1log.FromContext(ctx).Error("Error building kubernetes clientset: %s", svc1log.Stacktrace(err)) - return nil, err - } - sparkSchedulerClient, err := clientset.NewForConfig(kubeconfig) - if err != nil { - svc1log.FromContext(ctx).Error("Error building spark scheduler clientset: %s", svc1log.Stacktrace(err)) - return nil, err - } - apiExtensionsClient, err := apiextensionsclientset.NewForConfig(kubeconfig) - if err != nil { - svc1log.FromContext(ctx).Error("Error building api extensions clientset: %s", svc1log.Stacktrace(err)) - return nil, err - } + apiExtensionsClient := allClient.APIExtensionsClient + sparkSchedulerClient := allClient.SparkSchedulerClient + kubeClient := allClient.KubeClient + webhookClientConfig, err := conversionwebhook.InitializeCRDConversionWebhook(ctx, info.Router, install.Server, install.WebhookServiceConfig.Namespace, install.WebhookServiceConfig.ServiceName, install.WebhookServiceConfig.ServicePort) if err != nil { svc1log.FromContext(ctx).Error("Error instantiating CRD conversion webhook: %s", svc1log.Stacktrace(err)) - return nil, err + return err } err = crd.EnsureResourceReservationsCRD(ctx, apiExtensionsClient, install.ResourceReservationCRDAnnotations, v1beta2.ResourceReservationCustomResourceDefinition(webhookClientConfig, v1beta1.ResourceReservationCustomResourceDefinitionVersion()), ) if err != nil { svc1log.FromContext(ctx).Error("Error ensuring resource reservations v1beta2 CRD exists: %s", svc1log.Stacktrace(err)) - return nil, err + return err } kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Second*30) @@ -143,7 +121,7 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) { podInformer.HasSynced, resourceReservationInformer.HasSynced); !ok { svc1log.FromContext(ctx).Error("Error waiting for cache to sync") - return nil, nil + return werror.ErrorWithContextParams(ctx, "could not sync") } resourceReservationCache, err := cache.NewResourceReservationCache( @@ -155,7 +133,7 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) { if err != nil { svc1log.FromContext(ctx).Error("Error constructing resource reservation cache", svc1log.Stacktrace(err)) - return nil, err + return err } lazyDemandInformer := crd.NewLazyDemandInformer( @@ -249,10 +227,10 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) { go unschedulablePodMarker.Start(ctx) if err := registerExtenderEndpoints(info.Router, sparkSchedulerExtender); err != nil { - return nil, err + return err } - return nil, nil + return nil } // New creates and returns a witchcraft Server.