Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Commit

Permalink
improvement: Add the basic framework for an ete test and add a test f…
Browse files Browse the repository at this point in the history
…or checking static compaction (#264)
  • Loading branch information
k-simons authored Apr 19, 2023
1 parent b39c14f commit 5b3546a
Show file tree
Hide file tree
Showing 31 changed files with 9,714 additions and 38 deletions.
156 changes: 156 additions & 0 deletions cmd/integration/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright (c) 2019 Palantir Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"testing"
"time"

"github.com/palantir/k8s-spark-scheduler/cmd"
schedulerconfig "github.com/palantir/k8s-spark-scheduler/config"
"github.com/palantir/k8s-spark-scheduler/internal/extender"
"github.com/palantir/witchcraft-go-logging/wlog"
"github.com/palantir/witchcraft-go-logging/wlog/svclog/svc1log"
"github.com/palantir/witchcraft-go-logging/wlog/wapp"
"github.com/palantir/witchcraft-go-server/config"
"github.com/palantir/witchcraft-go-server/witchcraft"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// TestSetup returns values needed for setting up the server
type TestSetup struct {
ref *extender.SparkSchedulerExtender
ctx context.Context
cleanup func()
}

// SetUpServer sets up a WitchcraftServer invokes the init function with the client set
func SetUpServer(ctx context.Context, t *testing.T, installConfig schedulerconfig.Install, allClients cmd.AllClient) TestSetup {
var ref *extender.SparkSchedulerExtender
var rootCtx context.Context
server := witchcraft.NewServer().
WithInstallConfigType(schedulerconfig.Install{}).
WithInstallConfig(installConfig).
WithSelfSignedCertificate().
WithRuntimeConfig(config.Runtime{
LoggerConfig: &config.LoggerConfig{
Level: wlog.DebugLevel,
},
}).
WithDisableGoRuntimeMetrics().
WithInitFunc(func(ctx context.Context, initInfo witchcraft.InitInfo) (func(), error) {
rootCtx = ctx
f := func(ctx context.Context) error {
var err error
ref, err = cmd.InitServerWithClients(ctx, initInfo, allClients)
require.NoError(t, err)
return nil
}
err := wapp.RunWithFatalLogging(ctx, f)
require.NoError(t, err)
return nil, nil
})
go func() {
err := server.Start()
require.NoError(t, err)
}()

// wait for there to be a CRD we have ensured existed
waitForCondition(ctx, t, func() bool {
crds, err := allClients.APIExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().List(context.Background(), metav1.ListOptions{})
require.NoError(t, err)
return len(crds.Items) == 1
})

// Update that CRD to be ready
crds, err := allClients.APIExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().List(context.Background(), metav1.ListOptions{})
require.NoError(t, err)
for _, crd := range crds.Items {
crd.Status.Conditions = []apiextensionsv1.CustomResourceDefinitionCondition{
{
Type: apiextensionsv1.Established,
Status: apiextensionsv1.ConditionTrue,
},
}
_, err = allClients.APIExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.Background(), &crd, metav1.UpdateOptions{})
require.NoError(t, err)
}

// wait to ensure that there is an extension to return
waitForCondition(ctx, t, func() bool {
return ref != nil
})

cleanup := func() {
if err := server.Close(); err != nil {
svc1log.FromContext(ctx).Error(err.Error(), svc1log.Stacktrace(err))
}
}

return TestSetup{
ref: ref,
ctx: rootCtx,
cleanup: cleanup,
}
}

// ToResource returns a pointer to a Quantity
func ToResource(parse resource.Quantity) *resource.Quantity {
return &parse
}

func waitForCondition(ctx context.Context, t *testing.T, condition func() bool) {
ticker := time.NewTicker(time.Millisecond * 10)
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second*5))
defer ticker.Stop()
defer cancel()
for {
select {
case <-ctx.Done():
require.Fail(t, "Did not resolve condition")
return
case <-ticker.C:
checkCorrect := condition()
if checkCorrect {
return
}
}
}
}

// GetAffinityForInstanceGroup creates an Affinity for an instance group
func GetAffinityForInstanceGroup(labelKey string, instanceGroupName string) *v1.Affinity {
return &v1.Affinity{
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: labelKey,
Values: []string{instanceGroupName},
},
},
},
},
},
},
}
}
198 changes: 198 additions & 0 deletions cmd/integration/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright (c) 2019 Palantir Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"fmt"
"reflect"
"testing"

"github.com/palantir/k8s-spark-scheduler-lib/pkg/apis/sparkscheduler/v1beta2"
ssclientset "github.com/palantir/k8s-spark-scheduler-lib/pkg/client/clientset/versioned/fake"
"github.com/palantir/k8s-spark-scheduler/cmd"
schedulerconfig "github.com/palantir/k8s-spark-scheduler/config"
"github.com/palantir/k8s-spark-scheduler/internal/binpacker"
"github.com/palantir/k8s-spark-scheduler/internal/common"
"github.com/palantir/witchcraft-go-server/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
apiextensionsfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sfake "k8s.io/client-go/kubernetes/fake"
schedulerapi "k8s.io/kube-scheduler/extender/v1"
)

// Test_StaticCompaction tests that an existing static driver pod and its executors and be re-scheduled
func Test_StaticCompaction(t *testing.T) {
rr := v1beta2.ResourceReservation{
ObjectMeta: metav1.ObjectMeta{
Name: "appID1",
Namespace: "podNamespace",
},
Spec: v1beta2.ResourceReservationSpec{
Reservations: map[string]v1beta2.Reservation{
"driver": {
Node: "n1",
Resources: map[string]*resource.Quantity{
"cpu": ToResource(resource.MustParse("1")),
"memory": ToResource(resource.MustParse("1024Mi")),
},
},
"executor-1": {
Node: "n1",
Resources: map[string]*resource.Quantity{
"cpu": ToResource(resource.MustParse("2")),
"memory": ToResource(resource.MustParse("4096Mi")),
},
},
"executor-2": {
Node: "n1",
Resources: map[string]*resource.Quantity{
"cpu": ToResource(resource.MustParse("2")),
"memory": ToResource(resource.MustParse("4096Mi")),
},
},
},
},
Status: v1beta2.ResourceReservationStatus{
Pods: map[string]string{
"driver": "my-pod-driver",
"executor-1": "my-pod-executor-1",
"executor-2": "missing-pod-1",
},
},
}
driverPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "my-pod-driver",
Namespace: "podNamespace",
Labels: map[string]string{
common.SparkRoleLabel: common.Driver,
common.SparkAppIDLabel: "appID1",
},
Annotations: map[string]string{
common.DriverCPU: "1",
common.DriverMemory: "1024Mi",
common.ExecutorCPU: "2",
common.ExecutorMemory: "4096Mi",
common.ExecutorCount: "2",
},
},
Spec: v1.PodSpec{
NodeName: "n1",
SchedulerName: common.SparkSchedulerName,
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
}
existingNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "n1",
Labels: map[string]string{
v1.LabelTopologyZone: "zone1",
v1.LabelFailureDomainBetaZone: "zone1",
"resource_channel": "",
},
},
Status: v1.NodeStatus{
Capacity: map[v1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("32"),
"memory": resource.MustParse("28192Mi"),
},
Allocatable: map[v1.ResourceName]resource.Quantity{
"cpu": resource.MustParse("32"),
"memory": resource.MustParse("28192Mi"),
},
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
},
},
},
}

executor1Pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "my-pod-executor-1",
Namespace: "podNamespace",
Labels: map[string]string{
common.SparkRoleLabel: common.Executor,
common.SparkAppIDLabel: "appID1",
},
},
Spec: v1.PodSpec{
NodeName: "n1",
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
}
allClients := cmd.AllClient{
APIExtensionsClient: apiextensionsfake.NewSimpleClientset(),
SparkSchedulerClient: ssclientset.NewSimpleClientset(&rr),
KubeClient: k8sfake.NewSimpleClientset(existingNode, driverPod, executor1Pod),
}

installConfig := schedulerconfig.Install{
Install: config.Install{
UseConsoleLog: true,
},
ShouldScheduleDynamicallyAllocatedExecutorsInSameAZ: true,
BinpackAlgo: binpacker.SingleAzMinimalFragmentation,
}
testSetup := SetUpServer(context.Background(), t, installConfig, allClients)
ctx := testSetup.ctx
defer testSetup.cleanup()
nodeNames := []string{existingNode.Name}
args := schedulerapi.ExtenderArgs{
Pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "new-podName",
Namespace: "podNamespace",
Labels: map[string]string{
common.SparkRoleLabel: common.Executor,
common.SparkAppIDLabel: "appID1",
},
},
Spec: v1.PodSpec{
Affinity: GetAffinityForInstanceGroup("resource_channel", "desiredInstanceGroup"),
},
},
NodeNames: &nodeNames,
}
// Ensure we return the correct node
filterResult := testSetup.ref.Predicate(ctx, args)
returnedNodeNames := *filterResult.NodeNames
assert.Equal(t, 1, len(returnedNodeNames))
assert.Equal(t, "n1", returnedNodeNames[0])
// And that we updated the reservation

checkTheReservation := func() bool {
existingRR, err := allClients.SparkSchedulerClient.SparkschedulerV1beta2().ResourceReservations(rr.Namespace).Get(ctx, rr.Name, metav1.GetOptions{})
require.NoError(t, err)
fmt.Println(existingRR.Status.Pods)
return reflect.DeepEqual(existingRR.Status.Pods, map[string]string{
"driver": "my-pod-driver",
"executor-1": "my-pod-executor-1",
"executor-2": "new-podName",
})
}
waitForCondition(ctx, t, checkTheReservation)
}
Loading

0 comments on commit 5b3546a

Please sign in to comment.