From d7299ad5ce6115a9746beed08679763303ed4ada Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 28 Mar 2024 12:39:22 +0800 Subject: [PATCH 1/7] *: introduce contrib folder contrib contains all the binaries about test suites or how to warmup things. Signed-off-by: Wei Fu --- Dockerfile | 1 + Makefile | 10 +++++- contrib/cmd/runkperf/commands/root.go | 46 +++++++++++++++++++++++++++ contrib/cmd/runkperf/main.go | 16 ++++++++++ 4 files changed, 72 insertions(+), 1 deletion(-) create mode 100644 contrib/cmd/runkperf/commands/root.go create mode 100644 contrib/cmd/runkperf/main.go diff --git a/Dockerfile b/Dockerfile index f8ce0a4..52aea6f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,4 +17,5 @@ RUN apt update -y && apt install curl -y WORKDIR / COPY --from=build-stage /output/bin/kperf /kperf +COPY --from=build-stage /output/bin/runkperf /runkperf COPY scripts/run_runner.sh /run_runner.sh diff --git a/Makefile b/Makefile index 8ee7855..865dcf1 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ COMMANDS=kperf +CONTRIB_COMMANDS=runkperf # PREFIX is base path to install. PREFIX ?= /usr/local @@ -11,6 +12,7 @@ IMAGE_TAG ?= latest IMAGE_NAME = $(IMAGE_REPO)/kperf:$(IMAGE_TAG) BINARIES=$(addprefix bin/,$(COMMANDS)) +CONTRIB_BINARIES=$(addprefix bin/contrib/,$(CONTRIB_COMMANDS)) # default recipe is build .DEFAULT_GOAL := build @@ -19,14 +21,20 @@ BINARIES=$(addprefix bin/,$(COMMANDS)) ALWAYS: bin/%: cmd/% ALWAYS + @echo $@ @CGO_ENABLED=0 go build -o $@ ${GO_BUILDTAGS} ./$< -build: $(BINARIES) ## build binaries +bin/contrib/%: contrib/cmd/% ALWAYS + @echo $@ + @CGO_ENABLED=0 go build -o $@ ${GO_BUILDTAGS} ./$< + +build: $(BINARIES) $(CONTRIB_BINARIES) ## build binaries @echo "$@" install: ## install binaries @install -d $(PREFIX)/bin @install $(BINARIES) $(PREFIX)/bin + @install $(CONTRIB_BINARIES) $(PREFIX)/bin image-build: ## build image @echo building ${IMAGE_NAME} diff --git a/contrib/cmd/runkperf/commands/root.go b/contrib/cmd/runkperf/commands/root.go new file mode 100644 index 0000000..fdbd59d --- /dev/null +++ b/contrib/cmd/runkperf/commands/root.go @@ -0,0 +1,46 @@ +package commands + +import ( + "flag" + "fmt" + "os" + "strconv" + + "github.com/urfave/cli" + "k8s.io/klog/v2" +) + +// App returns kperf application. +func App() *cli.App { + return &cli.App{ + Name: "runkperf", + // TODO: add more fields + Commands: []cli.Command{}, + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "v", + Usage: "log level for V logs", + Value: "0", + }, + }, + Before: func(cliCtx *cli.Context) error { + return initKlog(cliCtx) + }, + } +} + +// initKlog initializes klog. +func initKlog(cliCtx *cli.Context) error { + klogFlagset := flag.NewFlagSet(os.Args[0], flag.ExitOnError) + klog.InitFlags(klogFlagset) + + vInStr := cliCtx.GlobalString("v") + if vFlag, err := strconv.Atoi(vInStr); err != nil || vFlag < 0 { + return fmt.Errorf("invalid value \"%v\" for flag -v: value must be a non-negative integer", vInStr) + } + + if err := klogFlagset.Set("v", vInStr); err != nil { + return fmt.Errorf("failed to set log level: %w", err) + } + return nil +} diff --git a/contrib/cmd/runkperf/main.go b/contrib/cmd/runkperf/main.go new file mode 100644 index 0000000..d41462f --- /dev/null +++ b/contrib/cmd/runkperf/main.go @@ -0,0 +1,16 @@ +package main + +import ( + "fmt" + "os" + + "github.com/Azure/kperf/contrib/cmd/runkperf/commands" +) + +func main() { + app := commands.App() + if err := app.Run(os.Args); err != nil { + fmt.Fprintf(os.Stderr, "%s: %v\n", app.Name, err) + os.Exit(1) + } +} From 96fdaf11961f01d0448fa24f7eef09a4618f0536 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 28 Mar 2024 17:08:56 +0800 Subject: [PATCH 2/7] cmd/kperf: improve error message for new nodepool Signed-off-by: Wei Fu --- cmd/kperf/commands/virtualcluster/nodepool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/kperf/commands/virtualcluster/nodepool.go b/cmd/kperf/commands/virtualcluster/nodepool.go index 917e72c..aad8234 100644 --- a/cmd/kperf/commands/virtualcluster/nodepool.go +++ b/cmd/kperf/commands/virtualcluster/nodepool.go @@ -72,7 +72,7 @@ var nodepoolAddCommand = cli.Command{ }, Action: func(cliCtx *cli.Context) error { if cliCtx.NArg() != 1 { - return fmt.Errorf("required only one argument as nodepool name") + return fmt.Errorf("required only one argument as nodepool name: %v", cliCtx.Args()) } nodepoolName := strings.TrimSpace(cliCtx.Args().Get(0)) if len(nodepoolName) == 0 { From 5c0cc920ae96b88a10523f04e0f992c88326d74f Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 28 Mar 2024 17:10:17 +0800 Subject: [PATCH 3/7] contrib: introduce internal package mountns Signed-off-by: Wei Fu --- contrib/internal/mountns/ns.go | 39 ++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 contrib/internal/mountns/ns.go diff --git a/contrib/internal/mountns/ns.go b/contrib/internal/mountns/ns.go new file mode 100644 index 0000000..8d12f4b --- /dev/null +++ b/contrib/internal/mountns/ns.go @@ -0,0 +1,39 @@ +package mountns + +import ( + "fmt" + "runtime" + "sync" + + "golang.org/x/sys/unix" +) + +// Executes runs the closure in a new mount namespace. +// +// NOTE: The caller should not call runtime.UnlockOSThread or fork any new +// goroutines, because it's risk. The thread in the new mount namespace should +// be cleanup by Go runtime when it exits without unlock OS thread. +func Executes(run func() error) error { + var wg sync.WaitGroup + wg.Add(1) + + var innerErr error + go func() (retErr error) { + defer wg.Done() + + defer func() { + innerErr = retErr + }() + + runtime.LockOSThread() + + err := unix.Unshare(unix.CLONE_FS | unix.CLONE_NEWNS) + if err != nil { + return fmt.Errorf("failed to create a new mount namespace: %w", err) + } + return run() + }() + wg.Wait() + + return innerErr +} From db69cf877599e00dcb002dc52e3c8b3c0c74a432 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 28 Mar 2024 17:10:43 +0800 Subject: [PATCH 4/7] contrib: introduce internal pkg utils Signed-off-by: Wei Fu --- contrib/internal/utils/kperf_cmd.go | 99 +++++++++++++++ contrib/internal/utils/kubectl_cmd.go | 173 ++++++++++++++++++++++++++ contrib/internal/utils/utils.go | 137 ++++++++++++++++++++ 3 files changed, 409 insertions(+) create mode 100644 contrib/internal/utils/kperf_cmd.go create mode 100644 contrib/internal/utils/kubectl_cmd.go create mode 100644 contrib/internal/utils/utils.go diff --git a/contrib/internal/utils/kperf_cmd.go b/contrib/internal/utils/kperf_cmd.go new file mode 100644 index 0000000..5add7c3 --- /dev/null +++ b/contrib/internal/utils/kperf_cmd.go @@ -0,0 +1,99 @@ +package utils + +import ( + "context" + "fmt" + "time" +) + +// KperfRunner is the wrapper of exec.Command to execute kperf command. +type KperfRunner struct { + kubeCfgPath string + runnerImage string +} + +func NewKperfRunner(kubeCfgPath string, runnerImage string) *KperfRunner { + return &KperfRunner{ + kubeCfgPath: kubeCfgPath, + runnerImage: runnerImage, + } +} + +// NewNodepool creates new virtual nodepool. +func (kr *KperfRunner) NewNodepool( + ctx context.Context, + timeout time.Duration, + name string, nodes int, maxPods int, + affinity string, + sharedProviderID string, +) error { + args := []string{"vc", "nodepool"} + if kr.kubeCfgPath != "" { + args = append(args, fmt.Sprintf("--kubeconfig=%s", kr.kubeCfgPath)) + } + args = append(args, "add", name, + fmt.Sprintf("--nodes=%v", nodes), + fmt.Sprintf("--cpu=%v", 32), + fmt.Sprintf("--memory=%v", 96), + fmt.Sprintf("--max-pods=%v", maxPods), + fmt.Sprintf("--affinity=%v", affinity), + fmt.Sprintf("--shared-provider-id=%v", sharedProviderID), + ) + + _, err := runCommand(ctx, timeout, "kperf", args) + return err +} + +// DeleteNodepool deletes a virtual nodepool by a given name. +func (kr *KperfRunner) DeleteNodepool(ctx context.Context, timeout time.Duration, name string) error { + args := []string{"vc", "nodepool"} + if kr.kubeCfgPath != "" { + args = append(args, fmt.Sprintf("--kubeconfig=%s", kr.kubeCfgPath)) + } + args = append(args, "delete", name) + + _, err := runCommand(ctx, timeout, "kperf", args) + return err +} + +// RGRun deploys runner group into kubernetes cluster. +func (kr *KperfRunner) RGRun(ctx context.Context, timeout time.Duration, rgCfgPath, flowcontrol string) error { + args := []string{"rg"} + if kr.kubeCfgPath != "" { + args = append(args, fmt.Sprintf("--kubeconfig=%s", kr.kubeCfgPath)) + } + args = append(args, "run", + fmt.Sprintf("--runnergroup=file://%v", rgCfgPath), + fmt.Sprintf("--runner-image=%v", kr.runnerImage), + ) + if flowcontrol != "" { + args = append(args, fmt.Sprintf("--runner-flowcontrol=%v", flowcontrol)) + } + + _, err := runCommand(ctx, timeout, "kperf", args) + return err +} + +// RGResult fetches runner group's result. +func (kr *KperfRunner) RGResult(ctx context.Context, timeout time.Duration) (string, error) { + args := []string{"rg"} + if kr.kubeCfgPath != "" { + args = append(args, fmt.Sprintf("--kubeconfig=%s", kr.kubeCfgPath)) + } + args = append(args, "result") + + data, err := runCommand(ctx, timeout, "kperf", args) + return string(data), err +} + +// RGDelete deletes runner group. +func (kr *KperfRunner) RGDelete(ctx context.Context, timeout time.Duration) error { + args := []string{"rg"} + if kr.kubeCfgPath != "" { + args = append(args, fmt.Sprintf("--kubeconfig=%s", kr.kubeCfgPath)) + } + args = append(args, "delete") + + _, err := runCommand(ctx, timeout, "kperf", args) + return err +} diff --git a/contrib/internal/utils/kubectl_cmd.go b/contrib/internal/utils/kubectl_cmd.go new file mode 100644 index 0000000..8942f10 --- /dev/null +++ b/contrib/internal/utils/kubectl_cmd.go @@ -0,0 +1,173 @@ +package utils + +import ( + "context" + "fmt" + "net/url" + "strings" + "time" + + "github.com/Azure/kperf/contrib/internal/mountns" + "golang.org/x/sys/unix" + + "k8s.io/klog/v2" +) + +// KubectlRunner is the wrapper of exec.Command to execute kubectl command. +type KubectlRunner struct { + kubeCfgPath string + namespace string +} + +func NewKubectlRunner(kubeCfgPath string, namespace string) *KubectlRunner { + return &KubectlRunner{ + kubeCfgPath: kubeCfgPath, + namespace: namespace, + } +} + +// FQDN returns the FQDN of the cluster. +func (kr *KubectlRunner) FQDN(ctx context.Context, timeout time.Duration) (string, error) { + args := []string{} + if kr.kubeCfgPath != "" { + args = append(args, "--kubeconfig", kr.kubeCfgPath) + } + args = append(args, "cluster-info") + + data, err := runCommand(ctx, timeout, "kubectl", args) + if err != nil { + return "", err + } + + line := strings.Split(string(data), "\n")[0] + items := strings.Fields(line) + + rawFqdn := items[len(items)-1] + rawFqdn = strings.TrimPrefix(rawFqdn, "\x1b[0;33m") + rawFqdn = strings.TrimSuffix(rawFqdn, "\x1b[0m") + + fqdn, err := url.Parse(rawFqdn) + if err != nil { + return "", err + } + return strings.ToLower(fqdn.Host), nil +} + +// Metrics returns the metrics for a specific kube-apiserver. +func (kr *KubectlRunner) Metrics(ctx context.Context, timeout time.Duration, fqdn, ip string) ([]byte, error) { + args := []string{} + if kr.kubeCfgPath != "" { + args = append(args, "--kubeconfig", kr.kubeCfgPath) + } + args = append(args, "get", "--raw", "/metrics") + + var result []byte + + merr := mountns.Executes(func() error { + newETCHostFile, cleanup, err := CreateTempFileWithContent([]byte(fmt.Sprintf("%s %s\n", ip, fqdn))) + if err != nil { + return err + } + defer cleanup() + + target := "/etc/hosts" + + err = unix.Mount(newETCHostFile, target, "none", unix.MS_BIND, "") + if err != nil { + return fmt.Errorf("failed to mount %s on %s: %w", + newETCHostFile, target, err) + } + defer func() { + derr := unix.Unmount(target, 0) + if derr != nil { + klog.Warningf("failed umount %s", target) + } + }() + + result, err = runCommand(ctx, timeout, "kubectl", args) + return err + }) + return result, merr +} + +// Wait runs wait subcommand. +func (kr *KubectlRunner) Wait(ctx context.Context, timeout time.Duration, condition, waitTimeout, target string) error { + if condition == "" { + return fmt.Errorf("condition is required") + } + + if target == "" { + return fmt.Errorf("target is required") + } + + args := []string{} + if kr.kubeCfgPath != "" { + args = append(args, "--kubeconfig", kr.kubeCfgPath) + } + if kr.namespace != "" { + args = append(args, "-n", kr.namespace) + } + + args = append(args, "wait", "--for="+condition) + if waitTimeout != "" { + args = append(args, "--timeout="+waitTimeout) + } + args = append(args, target) + + _, err := runCommand(ctx, timeout, "kubectl", args) + return err +} + +// CreateNamespace creates a new namespace. +func (kr *KubectlRunner) CreateNamespace(ctx context.Context, timeout time.Duration, name string) error { + args := []string{} + if kr.kubeCfgPath != "" { + args = append(args, "--kubeconfig", kr.kubeCfgPath) + } + args = append(args, "create", "namespace", name) + + _, err := runCommand(ctx, timeout, "kubectl", args) + return err +} + +// DeleteNamespace delete a namespace. +func (kr *KubectlRunner) DeleteNamespace(ctx context.Context, timeout time.Duration, name string) error { + args := []string{} + if kr.kubeCfgPath != "" { + args = append(args, "--kubeconfig", kr.kubeCfgPath) + } + args = append(args, "delete", "namespace", name) + + _, err := runCommand(ctx, timeout, "kubectl", args) + return err +} + +// Apply runs apply subcommand. +func (kr *KubectlRunner) Apply(ctx context.Context, timeout time.Duration, filePath string) error { + args := []string{} + if kr.kubeCfgPath != "" { + args = append(args, "--kubeconfig", kr.kubeCfgPath) + } + if kr.namespace != "" { + args = append(args, "-n", kr.namespace) + } + args = append(args, "apply", "-f", filePath) + + _, err := runCommand(ctx, timeout, "kubectl", args) + return err +} + +// Delete runs delete subcommand. +func (kr *KubectlRunner) Delete(ctx context.Context, timeout time.Duration, filePath string) error { + args := []string{} + if kr.kubeCfgPath != "" { + args = append(args, "--kubeconfig", kr.kubeCfgPath) + } + if kr.namespace != "" { + args = append(args, "-n", kr.namespace) + } + args = append(args, "delete", "-f", filePath) + + _, err := runCommand(ctx, timeout, "kubectl", args) + return err +} diff --git a/contrib/internal/utils/utils.go b/contrib/internal/utils/utils.go new file mode 100644 index 0000000..78fac17 --- /dev/null +++ b/contrib/internal/utils/utils.go @@ -0,0 +1,137 @@ +package utils + +import ( + "context" + "fmt" + "net" + "os" + "os/exec" + "sort" + "strings" + "syscall" + "time" + + "github.com/Azure/kperf/contrib/internal/manifests" + + "k8s.io/klog/v2" +) + +// RepeatJobWith3KPod repeats to deploy 3k pods. +func RepeatJobWith3KPod(ctx context.Context, kubeCfgPath string, namespace string, internal time.Duration) { + klog.V(0).Info("Repeat to create job with 3k pods") + + target := "workload/3kpod.job.yaml" + + data, err := manifests.FS.ReadFile(target) + if err != nil { + panic(fmt.Errorf("unexpected error when read %s from embed memory: %v", + target, err)) + } + + jobFile, cleanup, err := CreateTempFileWithContent(data) + if err != nil { + panic(fmt.Errorf("unexpected error when create job yaml: %v", err)) + } + defer cleanup() + + kr := NewKubectlRunner(kubeCfgPath, namespace) + + klog.V(0).Infof("Creating namespace %s", namespace) + err = kr.CreateNamespace(ctx, 5*time.Minute, namespace) + if err != nil { + panic(fmt.Errorf("failed to create a new namespace %s: %v", namespace, err)) + } + + defer func() { + klog.V(0).Infof("Cleanup namespace %s", namespace) + err := kr.DeleteNamespace(context.TODO(), 5*time.Minute, namespace) + if err != nil { + klog.V(0).ErrorS(err, "failed to cleanup", "namespace", namespace) + } + }() + + retryInterval := 5 * time.Second + for { + select { + case <-ctx.Done(): + klog.V(0).Info("Stop creating job") + return + default: + } + + time.Sleep(retryInterval) + + aerr := kr.Apply(ctx, 5*time.Minute, jobFile) + if aerr != nil { + klog.V(0).ErrorS(aerr, "failed to apply, retry after 5 seconds", "job", target) + continue + } + + werr := kr.Wait(ctx, 15*time.Minute, "condition=complete", "15m", "job/batchjobs") + if werr != nil { + klog.V(0).ErrorS(werr, "failed to wait", "job", target) + } + + derr := kr.Delete(ctx, 5*time.Minute, jobFile) + if derr != nil { + klog.V(0).ErrorS(derr, "failed to delete", "job", target) + } + time.Sleep(internal) + } +} + +// NSLookup returns ips for URL. +func NSLookup(domainURL string) ([]string, error) { + ips, err := net.LookupHost(domainURL) + if err != nil { + return nil, err + } + sort.Strings(ips) + return ips, nil +} + +// runCommand runs command with Pdeathsig. +func runCommand(ctx context.Context, timeout time.Duration, cmd string, args []string) ([]byte, error) { + var cancel context.CancelFunc + if timeout != 0 { + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + c := exec.CommandContext(ctx, cmd, args...) + c.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGKILL} + + klog.V(2).Infof("[CMD] %s", c.String()) + output, err := c.CombinedOutput() + if err != nil { + return nil, fmt.Errorf("failed to invoke %s:\n (output: %s): %w", + c.String(), strings.TrimSpace(string(output)), err) + } + return output, nil +} + +// CreateTempFileWithContent creates temporary file with data. +func CreateTempFileWithContent(data []byte) (_name string, _cleanup func() error, retErr error) { + f, err := os.CreateTemp("", "temp*") + if err != nil { + return "", nil, fmt.Errorf("failed to create temporary file: %w", err) + } + + fName := f.Name() + defer func() { + if retErr != nil { + os.RemoveAll(fName) + } + }() + + _, err = f.Write(data) + f.Close() + if err != nil { + return "", nil, fmt.Errorf("failed to write temporary in %s: %w", + fName, err) + } + + return fName, func() error { + return os.RemoveAll(fName) + }, nil +} From 46992023ef954e6967ae4d1104f3fa28170bf254 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 28 Mar 2024 17:11:03 +0800 Subject: [PATCH 5/7] contrib: introduce internal pkg manifest Signed-off-by: Wei Fu --- .../manifests/loadprofile/ekswarmup.yaml | 31 +++++++++++++++++++ contrib/internal/manifests/manifest.go | 9 ++++++ .../manifests/workload/3kpod.job.yaml | 31 +++++++++++++++++++ 3 files changed, 71 insertions(+) create mode 100644 contrib/internal/manifests/loadprofile/ekswarmup.yaml create mode 100644 contrib/internal/manifests/manifest.go create mode 100644 contrib/internal/manifests/workload/3kpod.job.yaml diff --git a/contrib/internal/manifests/loadprofile/ekswarmup.yaml b/contrib/internal/manifests/loadprofile/ekswarmup.yaml new file mode 100644 index 0000000..df76fcb --- /dev/null +++ b/contrib/internal/manifests/loadprofile/ekswarmup.yaml @@ -0,0 +1,31 @@ +count: 10 +loadProfile: + version: 1 + description: "warmup" + spec: + rate: 10 + total: 8000 + conns: 10 + client: 100 + contentType: json + disableHTTP2: false + maxRetries: 0 + requests: + - staleList: + version: v1 + resource: pods + limit: 500 + shares: 1000 # chance 1000 / (1000 + 100 + 100) + - quorumList: + version: v1 + resource: pods + limit: 1000 + shares: 100 # chance 100 / (1000 + 100 + 100) + - quorumList: + version: v1 + resource: events + limit: 1000 + shares: 100 # chance 100 / (1000 + 100 + 100) +nodeAffinity: + node.kubernetes.io/instance-type: + - m4.4xlarge diff --git a/contrib/internal/manifests/manifest.go b/contrib/internal/manifests/manifest.go new file mode 100644 index 0000000..f3006b9 --- /dev/null +++ b/contrib/internal/manifests/manifest.go @@ -0,0 +1,9 @@ +package manifests + +import "embed" + +// FS embeds the manifests +// +//go:embed workload/* +//go:embed loadprofile/* +var FS embed.FS diff --git a/contrib/internal/manifests/workload/3kpod.job.yaml b/contrib/internal/manifests/workload/3kpod.job.yaml new file mode 100644 index 0000000..75221c3 --- /dev/null +++ b/contrib/internal/manifests/workload/3kpod.job.yaml @@ -0,0 +1,31 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: batchjobs +spec: + completions: 3000 + parallelism: 100 + template: + metadata: + labels: + app: fake-pod + spec: + restartPolicy: Never + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: type + operator: In + values: + - kperf-virtualnodes + # A taints was added to an automatically created Node. + # You can remove taints of Node or add this tolerations. + tolerations: + - key: "kperf.io/nodepool" + operator: "Exists" + effect: "NoSchedule" + containers: + - name: fake-container + image: fake-image From b392b8c685bd360dd725a6d1e5a4d9a453d5dd56 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 28 Mar 2024 17:18:52 +0800 Subject: [PATCH 6/7] contrib: introduce ekswarmup subcommand Signed-off-by: Wei Fu --- .../runkperf/commands/ekswarmup/command.go | 292 ++++++++++++++++++ contrib/cmd/runkperf/commands/root.go | 6 +- 2 files changed, 297 insertions(+), 1 deletion(-) create mode 100644 contrib/cmd/runkperf/commands/ekswarmup/command.go diff --git a/contrib/cmd/runkperf/commands/ekswarmup/command.go b/contrib/cmd/runkperf/commands/ekswarmup/command.go new file mode 100644 index 0000000..35d3e18 --- /dev/null +++ b/contrib/cmd/runkperf/commands/ekswarmup/command.go @@ -0,0 +1,292 @@ +package ekswarmup + +import ( + "context" + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/Azure/kperf/contrib/internal/manifests" + "github.com/Azure/kperf/contrib/internal/utils" + + "github.com/urfave/cli" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" +) + +// Command represents ekswarmup subcommand. +var Command = cli.Command{ + Name: "ekswarmup", + Usage: "Warmup EKS cluster and try best to scale it to 8 cores at least", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "kubeconfig", + Usage: "Path to the kubeconfig file", + }, + cli.StringFlag{ + Name: "runner-image", + Usage: "The runner's conainer image", + // TODO(weifu): + // + // We should build release pipeline so that we can + // build with fixed public release image as default value. + // Right now, we need to set image manually. + Required: true, + }, + }, + Action: func(cliCtx *cli.Context) (retErr error) { + ctx := context.Background() + + kubeCfgPath := cliCtx.String("kubeconfig") + runnerImage := cliCtx.String("runner-image") + + perr := patchEKSDaemonsetWithoutToleration(ctx, kubeCfgPath) + if perr != nil { + return perr + } + + cores, ferr := fetchAPIServerCores(ctx, kubeCfgPath) + if ferr == nil { + if isReady(cores) { + klog.V(0).Infof("apiserver resource is ready: %v", cores) + return nil + } + } else { + klog.V(0).ErrorS(ferr, "failed to fetch apiserver cores") + } + + delNP, err := deployWarmupVirtualNodepool(ctx, kubeCfgPath) + if err != nil { + return err + } + defer func() { + derr := delNP() + if retErr == nil { + retErr = derr + } + }() + + var wg sync.WaitGroup + wg.Add(1) + + jobCtx, jobCancel := context.WithCancel(ctx) + go func() { + defer wg.Done() + + utils.RepeatJobWith3KPod(jobCtx, kubeCfgPath, "warmupjob", 5*time.Second) + }() + + derr := deployWarmupRunnerGroup(ctx, kubeCfgPath, runnerImage) + jobCancel() + wg.Wait() + + cores, ferr = fetchAPIServerCores(ctx, kubeCfgPath) + if ferr == nil { + if isReady(cores) { + klog.V(0).Infof("apiserver resource is ready: %v", cores) + return nil + } + } + return derr + }, +} + +// isReady returns true if there are more than two instances using 8 cores. +func isReady(cores map[string]int) bool { + n := 0 + for _, c := range cores { + if c >= 8 { + n++ + } + } + return n >= 2 +} + +// deployWarmupVirtualNodepool deploys nodepool on m4.2xlarge nodes for warmup. +func deployWarmupVirtualNodepool(ctx context.Context, kubeCfgPath string) (func() error, error) { + target := "warmup" + kr := utils.NewKperfRunner(kubeCfgPath, "") + + klog.V(0).InfoS("Deploying virtual nodepool", "name", target) + sharedProviderID, err := fetchPlaceholderNodeProviderID(ctx, kubeCfgPath) + if err != nil { + return nil, fmt.Errorf("failed to get placeholder providerID: %w", err) + } + + klog.V(0).InfoS("Trying to delete", "nodepool", target) + if err = kr.DeleteNodepool(ctx, 0, target); err != nil { + klog.V(0).ErrorS(err, "failed to delete", "nodepool", target) + } + + err = kr.NewNodepool(ctx, 0, target, 100, 110, + "node.kubernetes.io/instance-type=m4.2xlarge", sharedProviderID) + if err != nil { + return nil, fmt.Errorf("failed to create nodepool %s: %w", target, err) + } + + return func() error { + return kr.DeleteNodepool(ctx, 0, target) + }, nil +} + +// deployWarmupRunnerGroup deploys warmup runner group to trigger resource update. +func deployWarmupRunnerGroup(ctx context.Context, kubeCfgPath string, runnerImage string) error { + klog.V(0).Info("Deploying warmup runner group") + + target := "loadprofile/ekswarmup.yaml" + + data, err := manifests.FS.ReadFile(target) + if err != nil { + return fmt.Errorf("failed to read %s from embed memory: %w", target, err) + } + + rgCfgFile, cleanup, err := utils.CreateTempFileWithContent(data) + if err != nil { + return fmt.Errorf("failed to create temporary file for %s: %w", target, err) + } + defer cleanup() + + kr := utils.NewKperfRunner(kubeCfgPath, runnerImage) + + klog.V(0).Info("Deleting existing runner group") + derr := kr.RGDelete(ctx, 0) + if derr != nil { + klog.V(0).ErrorS(derr, "failed to delete existing runner group") + } + + rerr := kr.RGRun(ctx, 0, rgCfgFile, "workload-low:1000") + if rerr != nil { + return fmt.Errorf("failed to deploy warmup runner group: %w", rerr) + } + + klog.V(0).Info("Waiting warmup runner group") + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + res, err := kr.RGResult(ctx, 1*time.Minute) + if err != nil { + klog.V(0).ErrorS(err, "failed to fetch warmup runner group's result") + continue + } + klog.V(0).Infof("Warmup runner group's result: %s", res) + + klog.V(0).Info("Deleting warmup runner group") + if derr := kr.RGDelete(ctx, 0); derr != nil { + klog.V(0).ErrorS(err, "failed to delete existing runner group") + } + return nil + } +} + +func fetchPlaceholderNodeProviderID(ctx context.Context, kubeCfgPath string) (string, error) { + clientset := mustClientset(kubeCfgPath) + + // TODO(weifu): make it configurable + label := "node.kubernetes.io/instance-type=m4.large" + + nodeCli := clientset.CoreV1().Nodes() + listResp, err := nodeCli.List(ctx, metav1.ListOptions{LabelSelector: label}) + if err != nil { + return "", fmt.Errorf("failed to list nodes with label %s: %w", label, err) + } + + if len(listResp.Items) == 0 { + return "", fmt.Errorf("required placeholder node with m4.large flavor") + } + return listResp.Items[0].Spec.ProviderID, nil +} + +// patchEKSDaemonsetWithoutToleration removes tolerations to avoid pod scheduled +// to virtual nodes. +func patchEKSDaemonsetWithoutToleration(ctx context.Context, kubeCfgPath string) error { + clientset := mustClientset(kubeCfgPath) + + ds := clientset.AppsV1().DaemonSets("kube-system") + for _, dn := range []string{"aws-node", "kube-proxy"} { + d, err := ds.Get(ctx, dn, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get daemonset %s: %w", dn, err) + } + + d.Spec.Template.Spec.Tolerations = []corev1.Toleration{} + d.ResourceVersion = "" + + _, err = ds.Update(ctx, d, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to delete toleration for daemonset %s: %w", dn, err) + } + } + return nil +} + +// fetchAPIServerCores fetchs core number for each kube-apiserver. +func fetchAPIServerCores(ctx context.Context, kubeCfgPath string) (map[string]int, error) { + klog.V(0).Info("Fetching apiserver's cores") + + kr := utils.NewKubectlRunner(kubeCfgPath, "") + fqdn, err := kr.FQDN(ctx, 0) + if err != nil { + return nil, fmt.Errorf("failed to get cluster fqdn: %w", err) + } + + ips, nerr := utils.NSLookup(fqdn) + if nerr != nil { + return nil, fmt.Errorf("failed get dns records of fqdn %s: %w", fqdn, nerr) + } + + res := map[string]int{} + for _, ip := range ips { + cores, err := func() (int, error) { + data, err := kr.Metrics(ctx, 0, fqdn, ip) + if err != nil { + return 0, fmt.Errorf("failed to get metrics for ip %s: %w", ip, err) + } + + lines := strings.Split(string(data), "\n") + for _, line := range lines { + if strings.HasPrefix(line, "go_sched_gomaxprocs_threads") { + vInStr := strings.Fields(line)[1] + v, err := strconv.Atoi(vInStr) + if err != nil { + return 0, fmt.Errorf("failed to parse go_sched_gomaxprocs_threads %s: %w", line, err) + } + return v, nil + } + } + return 0, fmt.Errorf("failed to get go_sched_gomaxprocs_threads") + }() + if err != nil { + klog.V(0).ErrorS(err, "failed to get cores", "ip", ip) + continue + } + klog.V(0).InfoS("apiserver cores", ip, cores) + res[ip] = cores + } + if len(res) < 2 { + return nil, fmt.Errorf("expected two instances at least") + } + return res, nil +} + +// mustClientset returns kubernetes clientset without error. +func mustClientset(kubeCfgPath string) *kubernetes.Clientset { + config, err := clientcmd.BuildConfigFromFlags("", kubeCfgPath) + if err != nil { + panic(fmt.Errorf("failed to build client-go config: %w", err)) + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + panic(fmt.Errorf("failed to build client-go rest client: %w", err)) + } + return clientset +} diff --git a/contrib/cmd/runkperf/commands/root.go b/contrib/cmd/runkperf/commands/root.go index fdbd59d..f5bcdeb 100644 --- a/contrib/cmd/runkperf/commands/root.go +++ b/contrib/cmd/runkperf/commands/root.go @@ -6,6 +6,8 @@ import ( "os" "strconv" + "github.com/Azure/kperf/contrib/cmd/runkperf/commands/ekswarmup" + "github.com/urfave/cli" "k8s.io/klog/v2" ) @@ -15,7 +17,9 @@ func App() *cli.App { return &cli.App{ Name: "runkperf", // TODO: add more fields - Commands: []cli.Command{}, + Commands: []cli.Command{ + ekswarmup.Command, + }, Flags: []cli.Flag{ cli.StringFlag{ Name: "v", From c68472538bfdfe5d5614723b7c952976f1964e3a Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 28 Mar 2024 17:28:29 +0800 Subject: [PATCH 7/7] *: fix lint issue Signed-off-by: Wei Fu --- contrib/cmd/runkperf/commands/ekswarmup/command.go | 2 +- contrib/internal/mountns/ns.go | 11 ++++------- contrib/internal/utils/kubectl_cmd.go | 2 +- contrib/internal/utils/utils.go | 4 ++-- 4 files changed, 8 insertions(+), 11 deletions(-) diff --git a/contrib/cmd/runkperf/commands/ekswarmup/command.go b/contrib/cmd/runkperf/commands/ekswarmup/command.go index 35d3e18..a38ff20 100644 --- a/contrib/cmd/runkperf/commands/ekswarmup/command.go +++ b/contrib/cmd/runkperf/commands/ekswarmup/command.go @@ -149,7 +149,7 @@ func deployWarmupRunnerGroup(ctx context.Context, kubeCfgPath string, runnerImag if err != nil { return fmt.Errorf("failed to create temporary file for %s: %w", target, err) } - defer cleanup() + defer func() { _ = cleanup() }() kr := utils.NewKperfRunner(kubeCfgPath, runnerImage) diff --git a/contrib/internal/mountns/ns.go b/contrib/internal/mountns/ns.go index 8d12f4b..85a629a 100644 --- a/contrib/internal/mountns/ns.go +++ b/contrib/internal/mountns/ns.go @@ -18,20 +18,17 @@ func Executes(run func() error) error { wg.Add(1) var innerErr error - go func() (retErr error) { + go func() { defer wg.Done() - defer func() { - innerErr = retErr - }() - runtime.LockOSThread() err := unix.Unshare(unix.CLONE_FS | unix.CLONE_NEWNS) if err != nil { - return fmt.Errorf("failed to create a new mount namespace: %w", err) + innerErr = fmt.Errorf("failed to create a new mount namespace: %w", err) + return } - return run() + innerErr = run() }() wg.Wait() diff --git a/contrib/internal/utils/kubectl_cmd.go b/contrib/internal/utils/kubectl_cmd.go index 8942f10..d7e8eaf 100644 --- a/contrib/internal/utils/kubectl_cmd.go +++ b/contrib/internal/utils/kubectl_cmd.go @@ -68,7 +68,7 @@ func (kr *KubectlRunner) Metrics(ctx context.Context, timeout time.Duration, fqd if err != nil { return err } - defer cleanup() + defer func() { _ = cleanup() }() target := "/etc/hosts" diff --git a/contrib/internal/utils/utils.go b/contrib/internal/utils/utils.go index 78fac17..604c578 100644 --- a/contrib/internal/utils/utils.go +++ b/contrib/internal/utils/utils.go @@ -32,7 +32,7 @@ func RepeatJobWith3KPod(ctx context.Context, kubeCfgPath string, namespace strin if err != nil { panic(fmt.Errorf("unexpected error when create job yaml: %v", err)) } - defer cleanup() + defer func() { _ = cleanup() }() kr := NewKubectlRunner(kubeCfgPath, namespace) @@ -120,7 +120,7 @@ func CreateTempFileWithContent(data []byte) (_name string, _cleanup func() error fName := f.Name() defer func() { if retErr != nil { - os.RemoveAll(fName) + _ = os.RemoveAll(fName) } }()