Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: wait using kstatus #3043

Merged
merged 12 commits into from
Oct 9, 2024
2 changes: 2 additions & 0 deletions site/src/content/docs/ref/deploy.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ Deployments will wait for helm [post-install hooks](https://helm.sh/docs/topics/

:::

After the Helm wait completes successfully, Zarf waits for all resources in the applied chart to fully reconcile. To identify when reconciliation is achieved, Zarf uses [kstatus](https://github.com/kubernetes-sigs/cli-utils/blob/master/pkg/kstatus/README.md#kstatus). Kstatus assesses whether a resource is reconciled by checking the [status](https://kubernetes.io/docs/concepts/overview/working-with-objects/#object-spec-and-status) field. If a resource does not have a status field, kstatus considers it reconciled once it's found.

### Timeout Settings

The default timeout for Helm operations in Zarf is 15 minutes.
Expand Down
40 changes: 40 additions & 0 deletions src/internal/healthchecks/healthchecks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: 2021-Present The Zarf Authors

// Package healthchecks run kstatus style health checks on a list of objects
package healthchecks

import (
"context"

pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"
"github.com/zarf-dev/zarf/src/api/v1alpha1"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/object"
)

// Run waits for a list of objects to be reconciled
func Run(ctx context.Context, watcher watcher.StatusWatcher, healthChecks []v1alpha1.NamespacedObjectKindReference) error {
objs := []object.ObjMetadata{}
for _, hc := range healthChecks {
gv, err := schema.ParseGroupVersion(hc.APIVersion)
if err != nil {
return err
}
obj := object.ObjMetadata{
GroupKind: schema.GroupKind{
Group: gv.Group,
Kind: hc.Kind,
},
Namespace: hc.Namespace,
Name: hc.Name,
}
objs = append(objs, obj)
}
err := pkgkubernetes.WaitForReady(ctx, watcher, objs)
if err != nil {
return err
}
return nil
}
96 changes: 96 additions & 0 deletions src/internal/healthchecks/healthchecks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: 2021-Present The Zarf Authors

// Package healthchecks run kstatus style health checks on a list of objects
package healthchecks

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/zarf-dev/zarf/src/api/v1alpha1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/yaml"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/kubectl/pkg/scheme"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/testutil"
)

var podCurrentYaml = `
apiVersion: v1
kind: Pod
metadata:
name: good-pod
namespace: ns
status:
conditions:
- type: Ready
status: "True"
phase: Running
`

var podYaml = `
apiVersion: v1
kind: Pod
metadata:
name: in-progress-pod
namespace: ns
`

func TestRunHealthChecks(t *testing.T) {
t.Parallel()
tests := []struct {
name string
podYaml string
expectErr error
}{
{
name: "Pod is running",
podYaml: podCurrentYaml,
expectErr: nil,
},
{
name: "Pod is never ready",
podYaml: podYaml,
expectErr: context.DeadlineExceeded,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
fakeMapper := testutil.NewFakeRESTMapper(
v1.SchemeGroupVersion.WithKind("Pod"),
)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
m := make(map[string]interface{})
err := yaml.Unmarshal([]byte(tt.podYaml), &m)
require.NoError(t, err)
pod := &unstructured.Unstructured{Object: m}
statusWatcher := watcher.NewDefaultStatusWatcher(fakeClient, fakeMapper)
podGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
require.NoError(t, fakeClient.Tracker().Create(podGVR, pod, pod.GetNamespace()))
objs := []v1alpha1.NamespacedObjectKindReference{
{
APIVersion: pod.GetAPIVersion(),
Kind: pod.GetKind(),
Namespace: pod.GetNamespace(),
Name: pod.GetName(),
},
}
err = Run(ctx, statusWatcher, objs)
if tt.expectErr != nil {
require.ErrorIs(t, err, tt.expectErr)
return
}
require.NoError(t, err)
})
}
}
38 changes: 35 additions & 3 deletions src/internal/packager/helm/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package helm

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -24,7 +25,9 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/yaml"

"github.com/zarf-dev/zarf/src/api/v1alpha1"
"github.com/zarf-dev/zarf/src/config"
"github.com/zarf-dev/zarf/src/internal/healthchecks"
"github.com/zarf-dev/zarf/src/pkg/message"
"github.com/zarf-dev/zarf/src/types"
)
Expand Down Expand Up @@ -58,28 +61,30 @@ func (h *Helm) InstallOrUpgradeChart(ctx context.Context) (types.ConnectStrings,
}

histClient := action.NewHistory(h.actionConfig)
var release *release.Release

var helmOpStart time.Time
err = retry.Do(func() error {
var err error

releases, histErr := histClient.Run(h.chart.ReleaseName)

spinner.Updatef("Checking for existing helm deployment")
helmOpStart = time.Now()

if errors.Is(histErr, driver.ErrReleaseNotFound) {
// No prior release, try to install it.
spinner.Updatef("Attempting chart installation")

_, err = h.installChart(ctx, postRender)
release, err = h.installChart(ctx, postRender)
} else if histErr == nil && len(releases) > 0 {
// Otherwise, there is a prior release so upgrade it.
spinner.Updatef("Attempting chart upgrade")

lastRelease := releases[len(releases)-1]

_, err = h.upgradeChart(ctx, lastRelease, postRender)
release, err = h.upgradeChart(ctx, lastRelease, postRender)
} else {
// 😭 things aren't working
return fmt.Errorf("unable to verify the chart installation status: %w", histErr)
}

Expand Down Expand Up @@ -118,6 +123,33 @@ func (h *Helm) InstallOrUpgradeChart(ctx context.Context) (types.ConnectStrings,
return nil, "", installErr
}

resourceList, err := h.actionConfig.KubeClient.Build(bytes.NewBufferString(release.Manifest), true)
if err != nil {
return nil, "", fmt.Errorf("unable to build the resource list: %w", err)
}

healthChecks := []v1alpha1.NamespacedObjectKindReference{}
for _, resource := range resourceList {
apiVersion, kind := resource.Object.GetObjectKind().GroupVersionKind().ToAPIVersionAndKind()
healthChecks = append(healthChecks, v1alpha1.NamespacedObjectKindReference{
APIVersion: apiVersion,
Kind: kind,
Name: resource.Name,
Namespace: resource.Namespace,
})
}
if !h.chart.NoWait {
// Ensure we don't go past the timeout by getting the time since the helm operation started
healthCheckTimeout := h.timeout - time.Since(helmOpStart)
AustinAbro321 marked this conversation as resolved.
Show resolved Hide resolved
healthChecksCtx, cancel := context.WithTimeout(ctx, healthCheckTimeout)
defer cancel()
spinner.Updatef("Running health checks")
if err := healthchecks.Run(healthChecksCtx, h.cluster.Watcher, healthChecks); err != nil {
return nil, "", err
}
}
spinner.Success()

// return any collected connect strings for zarf connect.
return postRender.connectStrings, h.chart.ReleaseName, nil
}
Expand Down
31 changes: 2 additions & 29 deletions src/pkg/packager/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,16 @@ import (
"golang.org/x/sync/errgroup"

"github.com/avast/retry-go/v4"
pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/object"

"github.com/defenseunicorns/pkg/helpers/v2"
"github.com/zarf-dev/zarf/src/api/v1alpha1"
"github.com/zarf-dev/zarf/src/config"
"github.com/zarf-dev/zarf/src/internal/git"
"github.com/zarf-dev/zarf/src/internal/gitea"
"github.com/zarf-dev/zarf/src/internal/healthchecks"
"github.com/zarf-dev/zarf/src/internal/packager/helm"
"github.com/zarf-dev/zarf/src/internal/packager/images"
"github.com/zarf-dev/zarf/src/internal/packager/template"
Expand Down Expand Up @@ -238,30 +235,6 @@ func (p *Packager) deployComponents(ctx context.Context) ([]types.DeployedCompon
return deployedComponents, nil
}

func runHealthChecks(ctx context.Context, watcher watcher.StatusWatcher, healthChecks []v1alpha1.NamespacedObjectKindReference) error {
objs := []object.ObjMetadata{}
for _, hc := range healthChecks {
gv, err := schema.ParseGroupVersion(hc.APIVersion)
if err != nil {
return err
}
obj := object.ObjMetadata{
GroupKind: schema.GroupKind{
Group: gv.Group,
Kind: hc.Kind,
},
Namespace: hc.Namespace,
Name: hc.Name,
}
objs = append(objs, obj)
}
err := pkgkubernetes.WaitForReady(ctx, watcher, objs)
if err != nil {
return err
}
return nil
}

func (p *Packager) deployInitComponent(ctx context.Context, component v1alpha1.ZarfComponent) ([]types.InstalledChart, error) {
hasExternalRegistry := p.cfg.InitOpts.RegistryInfo.Address != ""
isSeedRegistry := component.Name == "zarf-seed-registry"
Expand Down Expand Up @@ -403,7 +376,7 @@ func (p *Packager) deployComponent(ctx context.Context, component v1alpha1.ZarfC
defer cancel()
spinner := message.NewProgressSpinner("Running health checks")
defer spinner.Stop()
if err = runHealthChecks(healthCheckContext, p.cluster.Watcher, component.HealthChecks); err != nil {
if err = healthchecks.Run(healthCheckContext, p.cluster.Watcher, component.HealthChecks); err != nil {
return nil, fmt.Errorf("health checks failed: %w", err)
}
spinner.Success()
Expand Down
89 changes: 0 additions & 89 deletions src/pkg/packager/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,12 @@
package packager

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/zarf-dev/zarf/src/api/v1alpha1"
"github.com/zarf-dev/zarf/src/pkg/packager/sources"
"github.com/zarf-dev/zarf/src/types"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/yaml"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/kubectl/pkg/scheme"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/testutil"
)

func TestGenerateValuesOverrides(t *testing.T) {
Expand Down Expand Up @@ -282,82 +272,3 @@ func TestServiceInfoFromServiceURL(t *testing.T) {
})
}
}

var podCurrentYaml = `
apiVersion: v1
kind: Pod
metadata:
name: good-pod
namespace: ns
status:
conditions:
- type: Ready
status: "True"
phase: Running
`

var podYaml = `
apiVersion: v1
kind: Pod
metadata:
name: in-progress-pod
namespace: ns
`

func yamlToUnstructured(t *testing.T, yml string) *unstructured.Unstructured {
t.Helper()
m := make(map[string]interface{})
err := yaml.Unmarshal([]byte(yml), &m)
require.NoError(t, err)
return &unstructured.Unstructured{Object: m}
}

func TestRunHealthChecks(t *testing.T) {
t.Parallel()
tests := []struct {
name string
podYaml string
expectErr error
}{
{
name: "Pod is running",
podYaml: podCurrentYaml,
expectErr: nil,
},
{
name: "Pod is never ready",
podYaml: podYaml,
expectErr: context.DeadlineExceeded,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
fakeClient := dynamicfake.NewSimpleDynamicClient(scheme.Scheme)
fakeMapper := testutil.NewFakeRESTMapper(
v1.SchemeGroupVersion.WithKind("Pod"),
)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
pod := yamlToUnstructured(t, tt.podYaml)
statusWatcher := watcher.NewDefaultStatusWatcher(fakeClient, fakeMapper)
podGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
require.NoError(t, fakeClient.Tracker().Create(podGVR, pod, pod.GetNamespace()))
objs := []v1alpha1.NamespacedObjectKindReference{
{
APIVersion: pod.GetAPIVersion(),
Kind: pod.GetKind(),
Namespace: pod.GetNamespace(),
Name: pod.GetName(),
},
}
err := runHealthChecks(ctx, statusWatcher, objs)
if tt.expectErr != nil {
require.ErrorIs(t, err, tt.expectErr)
return
}
require.NoError(t, err)
})
}
}