Skip to content

Commit

Permalink
feat: move pkg oci to Zarf and log during health checks (#3106)
Browse files Browse the repository at this point in the history
Signed-off-by: Austin Abro <[email protected]>
  • Loading branch information
AustinAbro321 authored Oct 17, 2024
1 parent 99209cf commit 537ce04
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 53 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
github.com/anchore/syft v1.14.0
github.com/avast/retry-go/v4 v4.6.0
github.com/defenseunicorns/pkg/helpers/v2 v2.0.1
github.com/defenseunicorns/pkg/kubernetes v0.3.0
github.com/defenseunicorns/pkg/oci v1.0.2
github.com/derailed/k9s v0.32.5
github.com/distribution/distribution/v3 v3.0.0-beta.1
Expand Down Expand Up @@ -556,7 +555,7 @@ require (
modernc.org/memory v1.8.0 // indirect
modernc.org/sqlite v1.33.1 // indirect
oras.land/oras-go v1.2.5 // indirect
sigs.k8s.io/controller-runtime v0.19.0 // indirect
sigs.k8s.io/controller-runtime v0.19.0
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/kustomize/kustomize/v5 v5.4.2 // indirect
sigs.k8s.io/release-utils v0.8.4 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -621,8 +621,6 @@ github.com/defenseunicorns/gojsonschema v0.0.0-20231116163348-e00f069122d6 h1:gw
github.com/defenseunicorns/gojsonschema v0.0.0-20231116163348-e00f069122d6/go.mod h1:StKLYMmPj1R5yIs6CK49EkcW1TvUYuw5Vri+LRk7Dy8=
github.com/defenseunicorns/pkg/helpers/v2 v2.0.1 h1:j08rz9vhyD9Bs+yKiyQMY2tSSejXRMxTqEObZ5M1Wbk=
github.com/defenseunicorns/pkg/helpers/v2 v2.0.1/go.mod h1:u1PAqOICZyiGIVA2v28g55bQH1GiAt0Bc4U9/rnWQvQ=
github.com/defenseunicorns/pkg/kubernetes v0.3.0 h1:f4VSIaUdvn87/dhiZvRbUfHhcHa8bKia6aU0WcvPbYg=
github.com/defenseunicorns/pkg/kubernetes v0.3.0/go.mod h1:FsuKQGpPZOnZWifBse7v787+avtIu2lte5LTsaojDkY=
github.com/defenseunicorns/pkg/oci v1.0.2 h1:JRdFbKnJQiGVsMUWmcmm0ZS8aBmmAORXLGSAGkIGhBQ=
github.com/defenseunicorns/pkg/oci v1.0.2/go.mod h1:z11UFenAd4HQRucaEp0uhoccor/6zbQiXEQq+Z7vtI0=
github.com/deitch/magic v0.0.0-20230404182410-1ff89d7342da h1:ZOjWpVsFZ06eIhnh4mkaceTiVoktdU67+M7KDHJ268M=
Expand Down
102 changes: 99 additions & 3 deletions src/internal/healthchecks/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@ package healthchecks

import (
"context"
"errors"
"fmt"

pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"
"github.com/zarf-dev/zarf/src/api/v1alpha1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/aggregator"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/collector"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
"sigs.k8s.io/cli-utils/pkg/object"
)

// Run waits for a list of objects to be reconciled
// Run waits for a list of Zarf healthchecks to reach a ready state.
func Run(ctx context.Context, watcher watcher.StatusWatcher, healthChecks []v1alpha1.NamespacedObjectKindReference) error {
objs := []object.ObjMetadata{}
for _, hc := range healthChecks {
Expand All @@ -32,9 +38,99 @@ func Run(ctx context.Context, watcher watcher.StatusWatcher, healthChecks []v1al
}
objs = append(objs, obj)
}
err := pkgkubernetes.WaitForReady(ctx, watcher, objs)
err := WaitForReady(ctx, watcher, objs)
if err != nil {
return err
}
return nil
}

// WaitForReadyRuntime waits for all of the objects to reach a ready state.
func WaitForReadyRuntime(ctx context.Context, sw watcher.StatusWatcher, robjs []runtime.Object) error {
objs := []object.ObjMetadata{}
for _, robj := range robjs {
obj, err := object.RuntimeToObjMeta(robj)
if err != nil {
return err
}
objs = append(objs, obj)
}
return WaitForReady(ctx, sw, objs)
}

// WaitForReady waits for all of the objects to reach a ready state.
func WaitForReady(ctx context.Context, sw watcher.StatusWatcher, objs []object.ObjMetadata) error {
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()

eventCh := sw.Watch(cancelCtx, objs, watcher.Options{})
statusCollector := collector.NewResourceStatusCollector(objs)
done := statusCollector.ListenWithObserver(eventCh, collector.ObserverFunc(
func(statusCollector *collector.ResourceStatusCollector, _ event.Event) {
rss := []*event.ResourceStatus{}
for _, rs := range statusCollector.ResourceStatuses {
if rs == nil {
continue
}
rss = append(rss, rs)
}
desired := status.CurrentStatus
if aggregator.AggregateStatus(rss, desired) == desired {
cancel()
return
}
}),
)
<-done

if statusCollector.Error != nil {
return statusCollector.Error
}

// Only check parent context error, otherwise we would error when desired status is achieved.
if ctx.Err() != nil {
errs := []error{}
for _, id := range objs {
rs := statusCollector.ResourceStatuses[id]
switch rs.Status {
case status.CurrentStatus:
case status.NotFoundStatus:
errs = append(errs, fmt.Errorf("%s: %s not found", rs.Identifier.Name, rs.Identifier.GroupKind.Kind))
default:
errs = append(errs, fmt.Errorf("%s: %s not ready", rs.Identifier.Name, rs.Identifier.GroupKind.Kind))
}
}
errs = append(errs, ctx.Err())
return errors.Join(errs...)
}

return nil
}

// ImmediateWatcher should only be used for testing and returns the set status immediately.
type ImmediateWatcher struct {
status status.Status
}

// NewImmediateWatcher returns a ImmediateWatcher.
func NewImmediateWatcher(status status.Status) *ImmediateWatcher {
return &ImmediateWatcher{
status: status,
}
}

// Watch watches the given objects and immediately returns the configured status.
func (w *ImmediateWatcher) Watch(_ context.Context, objs object.ObjMetadataSet, _ watcher.Options) <-chan event.Event {
eventCh := make(chan event.Event, len(objs))
for _, obj := range objs {
eventCh <- event.Event{
Type: event.ResourceUpdateEvent,
Resource: &event.ResourceStatus{
Identifier: obj,
Status: w.status,
},
}
}
close(eventCh)
return eventCh
}
46 changes: 25 additions & 21 deletions src/internal/healthchecks/healthchecks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package healthchecks

import (
"context"
"errors"
"testing"
"time"

Expand Down Expand Up @@ -45,19 +46,19 @@ metadata:
func TestRunHealthChecks(t *testing.T) {
t.Parallel()
tests := []struct {
name string
podYaml string
expectErr error
name string
podYamls []string
expectErrs []error
}{
{
name: "Pod is running",
podYaml: podCurrentYaml,
expectErr: nil,
name: "Pod is ready",
podYamls: []string{podCurrentYaml},
expectErrs: nil,
},
{
name: "Pod is never ready",
podYaml: podYaml,
expectErr: context.DeadlineExceeded,
name: "One pod is never ready",
podYamls: []string{podYaml, podCurrentYaml},
expectErrs: []error{errors.New("in-progress-pod: Pod not ready"), context.DeadlineExceeded},
},
}

Expand All @@ -70,24 +71,27 @@ func TestRunHealthChecks(t *testing.T) {
)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
m := make(map[string]interface{})
err := yaml.Unmarshal([]byte(tt.podYaml), &m)
require.NoError(t, err)
pod := &unstructured.Unstructured{Object: m}
statusWatcher := watcher.NewDefaultStatusWatcher(fakeClient, fakeMapper)
podGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
require.NoError(t, fakeClient.Tracker().Create(podGVR, pod, pod.GetNamespace()))
objs := []v1alpha1.NamespacedObjectKindReference{
{
objs := []v1alpha1.NamespacedObjectKindReference{}
for _, podYaml := range tt.podYamls {
m := make(map[string]interface{})
err := yaml.Unmarshal([]byte(podYaml), &m)
require.NoError(t, err)
pod := &unstructured.Unstructured{Object: m}
podGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
err = fakeClient.Tracker().Create(podGVR, pod, pod.GetNamespace())
require.NoError(t, err)
objs = append(objs, v1alpha1.NamespacedObjectKindReference{
APIVersion: pod.GetAPIVersion(),
Kind: pod.GetKind(),
Namespace: pod.GetNamespace(),
Name: pod.GetName(),
},
})
}
err = Run(ctx, statusWatcher, objs)
if tt.expectErr != nil {
require.ErrorIs(t, err, tt.expectErr)

err := Run(ctx, statusWatcher, objs)
if tt.expectErrs != nil {
require.EqualError(t, err, errors.Join(tt.expectErrs...).Error())
return
}
require.NoError(t, err)
Expand Down
14 changes: 4 additions & 10 deletions src/internal/packager/helm/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"helm.sh/helm/v3/pkg/releaseutil"
"helm.sh/helm/v3/pkg/storage/driver"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/yaml"

"github.com/zarf-dev/zarf/src/api/v1alpha1"
"github.com/zarf-dev/zarf/src/config"
"github.com/zarf-dev/zarf/src/internal/healthchecks"
"github.com/zarf-dev/zarf/src/pkg/message"
Expand Down Expand Up @@ -129,20 +129,14 @@ func (h *Helm) InstallOrUpgradeChart(ctx context.Context) (types.ConnectStrings,
return nil, "", fmt.Errorf("unable to build the resource list: %w", err)
}

healthChecks := []v1alpha1.NamespacedObjectKindReference{}
runtimeObjs := []runtime.Object{}
for _, resource := range resourceList {
apiVersion, kind := resource.Object.GetObjectKind().GroupVersionKind().ToAPIVersionAndKind()
healthChecks = append(healthChecks, v1alpha1.NamespacedObjectKindReference{
APIVersion: apiVersion,
Kind: kind,
Name: resource.Name,
Namespace: resource.Namespace,
})
runtimeObjs = append(runtimeObjs, resource.Object)
}
if !h.chart.NoWait {
// Ensure we don't go past the timeout by using a context initialized with the helm timeout
spinner.Updatef("Running health checks")
if err := healthchecks.Run(helmCtx, h.cluster.Watcher, healthChecks); err != nil {
if err := healthchecks.WaitForReadyRuntime(helmCtx, h.cluster.Watcher, runtimeObjs); err != nil {
return nil, "", err
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/internal/packager/helm/zarf.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/cli-utils/pkg/object"

pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/zarf-dev/zarf/src/api/v1alpha1"
"github.com/zarf-dev/zarf/src/internal/healthchecks"
"github.com/zarf-dev/zarf/src/internal/packager/template"
"github.com/zarf-dev/zarf/src/pkg/cluster"
"github.com/zarf-dev/zarf/src/pkg/message"
Expand Down Expand Up @@ -61,7 +60,7 @@ func (h *Helm) UpdateZarfRegistryValues(ctx context.Context) error {
}
waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second)
defer waitCancel()
err = pkgkubernetes.WaitForReady(waitCtx, h.cluster.Watcher, objs)
err = healthchecks.WaitForReady(waitCtx, h.cluster.Watcher, objs)
if err != nil {
return err
}
Expand Down Expand Up @@ -157,7 +156,7 @@ func (h *Helm) UpdateZarfAgentValues(ctx context.Context) error {
}
waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second)
defer waitCancel()
err = pkgkubernetes.WaitForReady(waitCtx, h.cluster.Watcher, objs)
err = healthchecks.WaitForReady(waitCtx, h.cluster.Watcher, objs)
if err != nil {
return err
}
Expand Down
41 changes: 38 additions & 3 deletions src/pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import (
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"

"github.com/avast/retry-go/v4"
pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"

"github.com/zarf-dev/zarf/src/pkg/message"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

const (
Expand Down Expand Up @@ -76,11 +78,11 @@ func NewClusterWithWait(ctx context.Context) (*Cluster, error) {
// NewCluster creates a new Cluster instance and validates connection to the cluster by fetching the Kubernetes version.
func NewCluster() (*Cluster, error) {
clusterErr := errors.New("unable to connect to the cluster")
clientset, config, err := pkgkubernetes.ClientAndConfig()
clientset, config, err := ClientAndConfig()
if err != nil {
return nil, errors.Join(clusterErr, err)
}
watcher, err := pkgkubernetes.WatcherForConfig(config)
watcher, err := WatcherForConfig(config)
if err != nil {
return nil, errors.Join(clusterErr, err)
}
Expand All @@ -96,3 +98,36 @@ func NewCluster() (*Cluster, error) {
}
return c, nil
}

// ClientAndConfig returns a Kubernetes client and the rest config used to configure the client.
func ClientAndConfig() (kubernetes.Interface, *rest.Config, error) {
loader := clientcmd.NewDefaultClientConfigLoadingRules()
clientCfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loader, nil)
cfg, err := clientCfg.ClientConfig()
if err != nil {
return nil, nil, err
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, nil, err
}
return clientset, cfg, nil
}

// WatcherForConfig returns a status watcher for the give Kubernetes configuration.
func WatcherForConfig(cfg *rest.Config) (watcher.StatusWatcher, error) {
dynamicClient, err := dynamic.NewForConfig(cfg)
if err != nil {
return nil, err
}
httpClient, err := rest.HTTPClientFor(cfg)
if err != nil {
return nil, err
}
restMapper, err := apiutil.NewDynamicRESTMapper(cfg, httpClient)
if err != nil {
return nil, err
}
sw := watcher.NewDefaultStatusWatcher(dynamicClient, restMapper)
return sw, nil
}
4 changes: 2 additions & 2 deletions src/pkg/cluster/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"k8s.io/apimachinery/pkg/util/wait"

"github.com/defenseunicorns/pkg/helpers/v2"
pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"

"github.com/zarf-dev/zarf/src/config"
"github.com/zarf-dev/zarf/src/internal/healthchecks"
"github.com/zarf-dev/zarf/src/pkg/message"
"github.com/zarf-dev/zarf/src/pkg/transform"
"github.com/zarf-dev/zarf/src/pkg/utils"
Expand Down Expand Up @@ -117,7 +117,7 @@ func (c *Cluster) StartInjection(ctx context.Context, tmpDir, imagesDir string,

waitCtx, waitCancel := context.WithTimeout(ctx, 60*time.Second)
defer waitCancel()
err = pkgkubernetes.WaitForReadyRuntime(waitCtx, c.Watcher, []runtime.Object{pod})
err = healthchecks.WaitForReadyRuntime(waitCtx, c.Watcher, []runtime.Object{pod})
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 537ce04

Please sign in to comment.